diff --git a/data/.lfs/franka_description.tar.gz b/data/.lfs/franka_description.tar.gz new file mode 100644 index 0000000000..67083e58b5 --- /dev/null +++ b/data/.lfs/franka_description.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:105a3404b7795dfc4a0eae879adcb125a8491e99cae81c56a92be5d7bac9ae72 +size 3892483 diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 5ec58a2b74..bd1133535c 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -27,10 +27,11 @@ from enum import Enum import threading import time -from typing import TYPE_CHECKING, Any, TypeAlias +from typing import TYPE_CHECKING, Any, Self, TypeAlias +import warnings import numpy as np -from pydantic import Field +from pydantic import Field, model_validator from dimos.agents.annotation import skill from dimos.agents.skill_result import SkillResult @@ -38,13 +39,22 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import In -from dimos.manipulation.planning.factory import create_kinematics, create_planner +from dimos.manipulation.planning.factory import ( + create_kinematics, + create_planner, + validate_planning_stack_config, +) from dimos.manipulation.planning.kinematics.config import ( JacobianKinematicsConfig, ManipulationKinematicsConfig, kinematics_config_from_name, ) from dimos.manipulation.planning.monitor.world_monitor import WorldMonitor +from dimos.manipulation.planning.planners.config import ( + MANIPULATION_PLANNER_CONFIG_ADAPTER, + ManipulationPlannerConfig, + RRTConnectPlannerConfig, +) from dimos.manipulation.planning.spec.config import RobotModelConfig from dimos.manipulation.planning.spec.enums import IKStatus, ObstacleType from dimos.manipulation.planning.spec.models import ( @@ -58,8 +68,11 @@ from dimos.manipulation.planning.trajectory_generator.joint_trajectory_generator import ( JointTrajectoryGenerator, ) +from dimos.manipulation.planning.vamp.errors import UnsupportedWorldCapabilityError +from dimos.manipulation.planning.world.config import DrakeWorldConfig, ManipulationWorldConfig from dimos.manipulation.skill_errors import ManipulationSkillError from dimos.msgs.geometry_msgs.Pose import Pose +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.sensor_msgs.JointState import JointState @@ -101,7 +114,10 @@ class ManipulationModuleConfig(ModuleConfig): robots: list[RobotModelConfig] = Field(default_factory=list) planning_timeout: float = 10.0 enable_viz: bool = False - planner_name: str = "rrt_connect" # "rrt_connect" + world: ManipulationWorldConfig = Field(default_factory=DrakeWorldConfig) + planner: ManipulationPlannerConfig = Field(default_factory=RRTConnectPlannerConfig) + # Deprecated: use planner.backend instead. + planner_name: str | None = None kinematics: ManipulationKinematicsConfig = Field(default_factory=JacobianKinematicsConfig) # Deprecated: use kinematics.backend instead. kinematics_name: str | None = None # "jacobian", "drake_optimization", or "pink" @@ -110,6 +126,29 @@ class ManipulationModuleConfig(ModuleConfig): # Set to None to disable. floor_z: float | None = None + @model_validator(mode="after") + def apply_legacy_flat_backend_fields(self) -> Self: + """Apply deprecated flat backend fields as noisy compatibility shims.""" + if self.planner_name is not None: + warnings.warn( + "ManipulationModuleConfig.planner_name is deprecated; use " + "planner={'backend': ...} instead.", + DeprecationWarning, + stacklevel=3, + ) + self.planner = MANIPULATION_PLANNER_CONFIG_ADAPTER.validate_python( + {"backend": self.planner_name} + ) + if self.kinematics_name is not None: + warnings.warn( + "ManipulationModuleConfig.kinematics_name is deprecated; use " + "kinematics={'backend': ...} instead.", + DeprecationWarning, + stacklevel=3, + ) + self.kinematics = kinematics_config_from_name(self.kinematics_name) + return self + class ManipulationModule(Module): """Base motion planning module with ControlCoordinator execution. @@ -178,7 +217,16 @@ def _initialize_planning(self) -> None: logger.warning("No robots configured, planning disabled") return - self._world_monitor = WorldMonitor(enable_viz=self.config.enable_viz) + validate_planning_stack_config( + world=self.config.world, + planner=self.config.planner, + kinematics=self.config.kinematics, + ) + + self._world_monitor = WorldMonitor( + config=self.config.world, + enable_viz=self.config.enable_viz, + ) for robot_config in self.config.robots: robot_id = self._world_monitor.add_robot(robot_config) @@ -216,11 +264,8 @@ def _initialize_planning(self) -> None: if url := self._world_monitor.get_visualization_url(): logger.info(f"Visualization: {url}") - self._planner = create_planner(name=self.config.planner_name) - kinematics_config = self.config.kinematics - if self.config.kinematics_name is not None: - kinematics_config = kinematics_config_from_name(self.config.kinematics_name) - self._kinematics = create_kinematics(config=kinematics_config) + self._planner = create_planner(config=self.config.planner) + self._kinematics = create_kinematics(config=self.config.kinematics) # Start TF publishing thread if any robot has tf_extra_links if any(c.tf_extra_links for _, c, _ in self._robots.values()): @@ -470,22 +515,22 @@ def _solve_ik_for_pose( """Run the configured kinematics backend for a world-frame pose.""" assert self._world_monitor and self._kinematics - # Convert Pose to PoseStamped for the IK solver - from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped - target_pose = PoseStamped( frame_id="world", position=pose.position, orientation=pose.orientation, ) - return self._kinematics.solve( - world=self._world_monitor.world, - robot_id=robot_id, - target_pose=target_pose, - seed=seed, - check_collision=check_collision, - ) + try: + return self._kinematics.solve( + world=self._world_monitor.world, + robot_id=robot_id, + target_pose=target_pose, + seed=seed, + check_collision=check_collision, + ) + except UnsupportedWorldCapabilityError as exc: + return IKResult(status=IKStatus.NO_SOLUTION, message=str(exc)) @rpc def solve_ik( @@ -548,7 +593,8 @@ def plan_to_pose(self, pose: Pose, robot_name: RobotName | None = None) -> bool: ik = self._solve_ik_for_pose(robot_id, pose, current, check_collision=True) if not ik.is_success() or ik.joint_state is None: - return self._fail(f"IK failed: {ik.status.name}") + detail = f": {ik.message}" if ik.message else "" + return self._fail(f"IK failed: {ik.status.name}{detail}") logger.info(f"IK solved, error: {ik.position_error:.4f}m") return self._plan_path_only(robot_name, robot_id, ik.joint_state) diff --git a/dimos/manipulation/planning/factory.py b/dimos/manipulation/planning/factory.py index 915feef8b1..7a1b62bff4 100644 --- a/dimos/manipulation/planning/factory.py +++ b/dimos/manipulation/planning/factory.py @@ -25,6 +25,18 @@ PinkKinematicsConfig, kinematics_config_from_name, ) +from dimos.manipulation.planning.planners.config import ( + MANIPULATION_PLANNER_CONFIG_ADAPTER, + ManipulationPlannerConfig, + RRTConnectPlannerConfig, + VampPlannerConfig, +) +from dimos.manipulation.planning.world.config import ( + MANIPULATION_WORLD_CONFIG_ADAPTER, + DrakeWorldConfig, + ManipulationWorldConfig, + VampWorldConfig, +) if TYPE_CHECKING: from dimos.manipulation.planning.spec.protocols import KinematicsSpec, PlannerSpec, WorldSpec @@ -32,16 +44,23 @@ def create_world( backend: str = "drake", + config: ManipulationWorldConfig | None = None, enable_viz: bool = False, **kwargs: Any, ) -> WorldSpec: - """Create a world instance. backend='drake', enable_viz for Meshcat.""" - if backend == "drake": + """Create a world instance from a backend name or typed world config.""" + if config is None: + config = MANIPULATION_WORLD_CONFIG_ADAPTER.validate_python({"backend": backend}) + + if isinstance(config, DrakeWorldConfig): from dimos.manipulation.planning.world.drake_world import DrakeWorld return DrakeWorld(enable_viz=enable_viz, **kwargs) - else: - raise ValueError(f"Unknown backend: {backend}. Available: ['drake']") + if isinstance(config, VampWorldConfig): + from dimos.manipulation.planning.world.vamp_world import VampWorld + + return VampWorld(config=config, **kwargs) + raise TypeError(f"Unsupported world config: {type(config).__name__}") def create_kinematics( @@ -73,30 +92,72 @@ def create_kinematics( def create_planner( name: str = "rrt_connect", + config: ManipulationPlannerConfig | None = None, **kwargs: Any, ) -> PlannerSpec: - """Create motion planner. name='rrt_connect'.""" - if name == "rrt_connect": + """Create motion planner from a backend name or typed planner config.""" + if config is None: + config = MANIPULATION_PLANNER_CONFIG_ADAPTER.validate_python({"backend": name}) + + if isinstance(config, RRTConnectPlannerConfig): from dimos.manipulation.planning.planners.rrt_planner import RRTConnectPlanner - return RRTConnectPlanner(**kwargs) - else: - raise ValueError(f"Unknown planner: {name}. Available: ['rrt_connect']") + return RRTConnectPlanner( + step_size=config.step_size, + connect_step_size=config.connect_step_size, + goal_tolerance=config.goal_tolerance, + collision_step_size=config.collision_step_size, + **kwargs, + ) + if isinstance(config, VampPlannerConfig): + from dimos.manipulation.planning.planners.vamp_planner import VampPlanner + + return VampPlanner(config=config, **kwargs) + raise TypeError(f"Unsupported planner config: {type(config).__name__}") + + +def validate_planning_stack_config( + world: ManipulationWorldConfig, + planner: ManipulationPlannerConfig, + kinematics: ManipulationKinematicsConfig, +) -> None: + """Validate that selected world, planner, and kinematics backends can pair.""" + if isinstance(planner, VampPlannerConfig) and not isinstance(world, VampWorldConfig): + raise ValueError("VAMP planner requires world backend 'vamp'") + if isinstance(world, VampWorldConfig) and not isinstance(planner, VampPlannerConfig): + raise ValueError("VAMP world backend requires planner backend 'vamp'") + if isinstance(kinematics, DrakeOptimizationKinematicsConfig) and not isinstance( + world, DrakeWorldConfig + ): + raise ValueError("Drake optimization kinematics requires world backend 'drake'") def create_planning_stack( robot_config: Any, enable_viz: bool = False, + world: ManipulationWorldConfig | None = None, planner_name: str = "rrt_connect", + planner: ManipulationPlannerConfig | None = None, kinematics_name: str = "jacobian", kinematics: ManipulationKinematicsConfig | None = None, ) -> tuple[WorldSpec, KinematicsSpec, PlannerSpec, str]: """Create complete planning stack. Returns (world, kinematics, planner, robot_id).""" - world = create_world(backend="drake", enable_viz=enable_viz) - kinematics_solver = create_kinematics(name=kinematics_name, config=kinematics) - planner = create_planner(name=planner_name) - - robot_id = world.add_robot(robot_config) - world.finalize() - - return world, kinematics_solver, planner, robot_id + world_config = world if world is not None else DrakeWorldConfig() + planner_config = ( + planner + if planner is not None + else MANIPULATION_PLANNER_CONFIG_ADAPTER.validate_python({"backend": planner_name}) + ) + kinematics_config = ( + kinematics if kinematics is not None else kinematics_config_from_name(kinematics_name) + ) + validate_planning_stack_config(world_config, planner_config, kinematics_config) + + world_backend = create_world(config=world_config, enable_viz=enable_viz) + kinematics_solver = create_kinematics(name=kinematics_name, config=kinematics_config) + planner_backend = create_planner(name=planner_name, config=planner_config) + + robot_id = world_backend.add_robot(robot_config) + world_backend.finalize() + + return world_backend, kinematics_solver, planner_backend, robot_id diff --git a/dimos/manipulation/planning/monitor/world_monitor.py b/dimos/manipulation/planning/monitor/world_monitor.py index 0bcb032b3a..90eff53f31 100644 --- a/dimos/manipulation/planning/monitor/world_monitor.py +++ b/dimos/manipulation/planning/monitor/world_monitor.py @@ -25,6 +25,10 @@ from dimos.manipulation.planning.monitor.robot_state_monitor import RobotStateMonitor from dimos.manipulation.planning.monitor.world_obstacle_monitor import WorldObstacleMonitor from dimos.manipulation.planning.spec.protocols import VisualizationSpec +from dimos.manipulation.planning.world.config import ( + MANIPULATION_WORLD_CONFIG_ADAPTER, + ManipulationWorldConfig, +) from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.sensor_msgs.JointState import JointState from dimos.utils.logging_config import setup_logger @@ -55,11 +59,17 @@ class WorldMonitor: def __init__( self, backend: str = "drake", + config: ManipulationWorldConfig | None = None, enable_viz: bool = False, **kwargs: Any, ) -> None: - self._backend = backend - self._world: WorldSpec = create_world(backend=backend, enable_viz=enable_viz, **kwargs) + world_config = ( + config + if config is not None + else MANIPULATION_WORLD_CONFIG_ADAPTER.validate_python({"backend": backend}) + ) + self._backend = world_config.backend + self._world: WorldSpec = create_world(config=world_config, enable_viz=enable_viz, **kwargs) self._visualization: VisualizationSpec | None = ( self._world if isinstance(self._world, VisualizationSpec) else None ) diff --git a/dimos/manipulation/planning/planners/config.py b/dimos/manipulation/planning/planners/config.py new file mode 100644 index 0000000000..3b83357d0f --- /dev/null +++ b/dimos/manipulation/planning/planners/config.py @@ -0,0 +1,60 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration models for manipulation planner backends.""" + +from __future__ import annotations + +from typing import Annotated, Literal + +from pydantic import Field, TypeAdapter + +from dimos.protocol.service.spec import BaseConfig + + +class RRTConnectPlannerConfig(BaseConfig): + """Configuration for the backend-agnostic RRT-Connect planner.""" + + backend: Literal["rrt_connect"] = "rrt_connect" + step_size: float = 0.1 + connect_step_size: float = 0.05 + goal_tolerance: float = 0.1 + collision_step_size: float = 0.02 + + +class VampPlannerConfig(BaseConfig): + """Configuration for the VAMP-native joint-space planner adapter.""" + + backend: Literal["vamp"] = "vamp" + algorithm: Literal["rrtc", "prm", "fcit", "aorrtc"] = "rrtc" + simplify: bool = True + validate_path: bool = True + + +ManipulationPlannerConfig = Annotated[ + RRTConnectPlannerConfig | VampPlannerConfig, + Field(discriminator="backend"), +] + +MANIPULATION_PLANNER_CONFIG_ADAPTER: TypeAdapter[ManipulationPlannerConfig] = TypeAdapter( + ManipulationPlannerConfig +) + + +__all__ = [ + "MANIPULATION_PLANNER_CONFIG_ADAPTER", + "ManipulationPlannerConfig", + "RRTConnectPlannerConfig", + "VampPlannerConfig", +] diff --git a/dimos/manipulation/planning/planners/vamp_planner.py b/dimos/manipulation/planning/planners/vamp_planner.py new file mode 100644 index 0000000000..5acc7c8938 --- /dev/null +++ b/dimos/manipulation/planning/planners/vamp_planner.py @@ -0,0 +1,47 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VAMP-native joint-space planner adapter.""" + +from __future__ import annotations + +from dimos.manipulation.planning.planners.config import VampPlannerConfig +from dimos.manipulation.planning.spec.models import PlanningResult, WorldRobotID +from dimos.manipulation.planning.spec.protocols import WorldSpec +from dimos.manipulation.planning.world.vamp_world import VampWorld +from dimos.msgs.sensor_msgs.JointState import JointState + + +class VampPlanner: + """Joint-space planner adapter for VAMP robot modules.""" + + def __init__(self, config: VampPlannerConfig) -> None: + self.config = config + + def plan_joint_path( + self, + world: WorldSpec, + robot_id: WorldRobotID, + start: JointState, + goal: JointState, + timeout: float = 10.0, + ) -> PlanningResult: + """Plan a VAMP-native joint-space path.""" + if not isinstance(world, VampWorld): + raise ValueError("VampPlanner requires VampWorld") + return world.plan_joint_path(self.config, robot_id, start, goal, timeout=timeout) + + def get_name(self) -> str: + """Get planner name.""" + return f"VAMP/{self.config.algorithm}" diff --git a/dimos/manipulation/planning/vamp/errors.py b/dimos/manipulation/planning/vamp/errors.py new file mode 100644 index 0000000000..6070353d40 --- /dev/null +++ b/dimos/manipulation/planning/vamp/errors.py @@ -0,0 +1,32 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Errors for the optional VAMP manipulation planning backend.""" + + +class VampDependencyError(ImportError): + """Raised when the VAMP backend is selected without the optional dependency.""" + + def __init__(self) -> None: + super().__init__( + "VAMP planning backend requires the optional 'vamp-planner' dependency. " + "Install the DimOS VAMP extra or install vamp-planner in this environment." + ) + + +class UnsupportedWorldCapabilityError(NotImplementedError): + """Raised when a world backend does not natively support a requested capability.""" + + def __init__(self, backend: str, capability: str) -> None: + super().__init__(f"World backend '{backend}' does not support capability: {capability}") diff --git a/dimos/manipulation/planning/vamp/loader.py b/dimos/manipulation/planning/vamp/loader.py new file mode 100644 index 0000000000..95c07fcaa7 --- /dev/null +++ b/dimos/manipulation/planning/vamp/loader.py @@ -0,0 +1,102 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Direct optional imports for VAMP robot artifacts.""" + +from __future__ import annotations + +import importlib +import importlib.util +from pathlib import Path +import sys +from typing import cast + +from dimos.manipulation.planning.vamp.errors import VampDependencyError +from dimos.manipulation.planning.world.config import ( + CustomVampArtifactConfig, + OfficialVampArtifactConfig, + VampArtifactConfig, +) + +_VAMP_IMPORT_ERROR: ImportError | None + +try: + import vamp + import vamp.baxter + import vamp.fetch + import vamp.panda + import vamp.sphere + import vamp.ur5 +except ImportError as exc: + _VAMP_IMPORT_ERROR = exc +else: + _VAMP_IMPORT_ERROR = None + + +def require_vamp() -> None: + """Raise with install guidance when the optional VAMP package is unavailable.""" + if _VAMP_IMPORT_ERROR is not None: + raise VampDependencyError() from _VAMP_IMPORT_ERROR + + +def load_vamp_robot_module(artifact: VampArtifactConfig) -> vamp.RobotModule: + """Load the configured VAMP robot module.""" + require_vamp() + if isinstance(artifact, OfficialVampArtifactConfig): + return _load_official_robot_module(artifact.robot) + if isinstance(artifact, CustomVampArtifactConfig): + return _load_custom_robot_module(artifact.path) + raise TypeError(f"Unsupported VAMP artifact config: {type(artifact).__name__}") + + +def _load_official_robot_module(robot: str) -> vamp.RobotModule: + match robot: + case "baxter": + return cast("vamp.RobotModule", vamp.baxter) + case "fetch": + return cast("vamp.RobotModule", vamp.fetch) + case "panda": + return cast("vamp.RobotModule", vamp.panda) + case "sphere": + return cast("vamp.RobotModule", vamp.sphere) + case "ur5": + return cast("vamp.RobotModule", vamp.ur5) + case _: + raise ValueError(f"Installed VAMP package does not expose robot artifact '{robot}'") + + +def _load_custom_robot_module(path: Path) -> vamp.RobotModule: + artifact_path = path.expanduser().resolve() + if not artifact_path.exists(): + raise FileNotFoundError(f"VAMP custom artifact path does not exist: {artifact_path}") + + if artifact_path.is_dir(): + parent = str(artifact_path.parent) + should_remove_parent = parent not in sys.path + if should_remove_parent: + sys.path.insert(0, parent) + try: + return cast("vamp.RobotModule", importlib.import_module(artifact_path.name)) + finally: + if should_remove_parent: + sys.path.remove(parent) + + module_name = artifact_path.stem + spec = importlib.util.spec_from_file_location(module_name, artifact_path) + if spec is None or spec.loader is None: + raise ImportError(f"Could not load VAMP custom artifact module: {artifact_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return cast("vamp.RobotModule", module) diff --git a/dimos/manipulation/planning/vamp/test_vamp_backend.py b/dimos/manipulation/planning/vamp/test_vamp_backend.py new file mode 100644 index 0000000000..93e26d92d1 --- /dev/null +++ b/dimos/manipulation/planning/vamp/test_vamp_backend.py @@ -0,0 +1,428 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the optional VAMP planning backend adapters.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from pathlib import Path +import sys +from types import ModuleType + +import numpy as np +from numpy.typing import NDArray +from pydantic import ValidationError +import pytest + +from dimos.manipulation.planning.factory import create_planner, create_world +from dimos.manipulation.planning.planners.config import RRTConnectPlannerConfig, VampPlannerConfig +from dimos.manipulation.planning.planners.vamp_planner import VampPlanner +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.manipulation.planning.spec.enums import PlanningStatus +from dimos.manipulation.planning.vamp.errors import ( + UnsupportedWorldCapabilityError, + VampDependencyError, +) +from dimos.manipulation.planning.vamp.loader import load_vamp_robot_module +from dimos.manipulation.planning.world.config import ( + CustomVampArtifactConfig, + OfficialVampArtifactConfig, + VampWorldConfig, +) +from dimos.manipulation.planning.world.vamp_world import VampWorld +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.sensor_msgs.JointState import JointState + + +@dataclass(frozen=True) +class FakeSphere: + center: list[float] + radius: float + + +@dataclass(frozen=True) +class FakeCuboid: + center: list[float] + euler_xyz: list[float] + half_extents: list[float] + + +@dataclass(frozen=True) +class FakeCylinder: + center: list[float] + euler_xyz: list[float] + radius: float + length: float + + +@dataclass(frozen=True) +class FakePlannerSettings: + max_iterations: int + + +@dataclass(frozen=True) +class FakeSimplifySettings: + enabled: bool = True + + +@dataclass(frozen=True) +class FakeSampler: + name: str + + +class FakeEnvironment: + """Small fake VAMP environment that records converted primitives.""" + + def __init__(self) -> None: + self.spheres: list[FakeSphere] = [] + self.cuboids: list[FakeCuboid] = [] + self.capsules: list[FakeCylinder] = [] + + def add_sphere(self, sphere: FakeSphere) -> None: + self.spheres.append(sphere) + + def add_cuboid(self, cuboid: FakeCuboid) -> None: + self.cuboids.append(cuboid) + + def add_capsule(self, capsule: FakeCylinder) -> None: + self.capsules.append(capsule) + + +class FakePath: + """Fake VAMP path value exposing the numpy() method used by bindings.""" + + def __init__(self, waypoints: list[list[float]]) -> None: + self._waypoints = np.array(waypoints, dtype=np.float64) + + def numpy(self) -> NDArray[np.float64]: + return self._waypoints + + +class FakePlanningResult: + """Fake VAMP planning result.""" + + def __init__(self, solved: bool, path: FakePath, iterations: int = 7) -> None: + self.solved = solved + self.path = path + self.iterations = iterations + + +class FakeRobotModule(ModuleType): + """Fake official VAMP robot module.""" + + def __init__(self) -> None: + super().__init__("vamp.panda") + self.validate_calls: list[tuple[list[float], bool]] = [] + self.motion_calls: list[tuple[list[float], list[float], bool]] = [] + self.simplify_calls: list[FakePath] = [] + self.valid = True + self.motion_valid = True + + def halton(self) -> FakeSampler: + return FakeSampler("fake_sampler") + + def validate( + self, configuration: list[float], environment: FakeEnvironment, check_bounds: bool + ) -> bool: + del environment + self.validate_calls.append((configuration, check_bounds)) + return self.valid + + def validate_motion( + self, + configuration_in: list[float], + configuration_out: list[float], + environment: FakeEnvironment, + check_bounds: bool, + ) -> bool: + del environment + self.motion_calls.append((configuration_in, configuration_out, check_bounds)) + return self.motion_valid + + def eefk(self, configuration: list[float]) -> np.ndarray: + transform = np.eye(4, dtype=np.float64) + transform[0, 3] = configuration[0] + transform[1, 3] = configuration[1] + transform[2, 3] = configuration[2] + return transform + + def simplify( + self, + path: FakePath, + environment: FakeEnvironment, + settings: FakeSimplifySettings, + sampler: FakeSampler, + ) -> FakePlanningResult: + del environment, settings, sampler + self.simplify_calls.append(path) + return FakePlanningResult(True, FakePath([[0.0, 0.0, 0.0], [1.0, 0.5, 0.25]]), 2) + + +class FakeVampModule(ModuleType): + """Fake top-level VAMP package module.""" + + def __init__(self, robot_module: FakeRobotModule) -> None: + super().__init__("vamp") + self.panda = robot_module + self.Environment = FakeEnvironment + self.Sphere = FakeSphere + self.Cuboid = FakeCuboid + self.Cylinder = FakeCylinder + self.configure_calls: list[tuple[str, str, int]] = [] + self.planner_calls: list[tuple[list[float], list[float], FakeSampler]] = [] + self.planner_solved = True + + def configure_robot_and_planner_with_kwargs( + self, robot_name: str, planner_name: str, max_iterations: int + ) -> tuple[ + FakeRobotModule, + Callable[ + [ + list[float], + list[float], + FakeEnvironment, + FakePlannerSettings, + FakeSampler, + ], + FakePlanningResult, + ], + FakePlannerSettings, + FakeSimplifySettings, + ]: + self.configure_calls.append((robot_name, planner_name, max_iterations)) + + def planner_func( + start: list[float], + goal: list[float], + environment: FakeEnvironment, + settings: FakePlannerSettings, + sampler: FakeSampler, + ) -> FakePlanningResult: + del environment, settings + self.planner_calls.append((start, goal, sampler)) + return FakePlanningResult( + self.planner_solved, + FakePath([start, [0.5, 0.25, 0.125], goal]), + 11, + ) + + return ( + self.panda, + planner_func, + FakePlannerSettings(max_iterations=max_iterations), + FakeSimplifySettings(), + ) + + +@pytest.fixture +def fake_vamp_modules(mocker) -> tuple[FakeVampModule, FakeRobotModule]: + """Install fake VAMP modules into sys.modules.""" + robot_module = FakeRobotModule() + vamp_module = FakeVampModule(robot_module) + mocker.patch.dict(sys.modules, {"vamp": vamp_module, "vamp.panda": robot_module}) + mocker.patch("dimos.manipulation.planning.vamp.loader.vamp", vamp_module, create=True) + mocker.patch("dimos.manipulation.planning.world.vamp_world.vamp", vamp_module, create=True) + mocker.patch("dimos.manipulation.planning.vamp.loader._VAMP_IMPORT_ERROR", None) + return vamp_module, robot_module + + +def robot_config() -> RobotModelConfig: + """Create a minimal robot model config for VAMP adapter tests.""" + return RobotModelConfig( + name="panda", + model_path=Path("/tmp/panda.urdf"), + base_pose=PoseStamped(position=Vector3(), orientation=Quaternion()), + joint_names=["joint1", "joint2", "joint3"], + end_effector_link="panda_hand", + base_link="panda_link0", + joint_limits_lower=[-1.0, -1.0, -1.0], + joint_limits_upper=[1.0, 1.0, 1.0], + home_joints=[0.0, 0.0, 0.0], + ) + + +def finalized_vamp_world() -> VampWorld: + """Create a finalized fake-backed VAMP world.""" + world = VampWorld(VampWorldConfig()) + world.add_robot(robot_config()) + world.finalize() + return world + + +def test_vamp_dependency_error_has_install_guidance(mocker) -> None: + """Selecting VAMP without the optional package raises an actionable error.""" + mocker.patch( + "dimos.manipulation.planning.vamp.loader._VAMP_IMPORT_ERROR", + ImportError("No module named 'vamp'"), + ) + + with pytest.raises(VampDependencyError, match="vamp-planner"): + load_vamp_robot_module(OfficialVampArtifactConfig(robot="panda")) + + +def test_rrt_planner_creation_works_when_vamp_unavailable(mocker) -> None: + """Default planner creation still works when the optional VAMP package is unavailable.""" + mocker.patch( + "dimos.manipulation.planning.vamp.loader._VAMP_IMPORT_ERROR", + ImportError("No module named 'vamp'"), + ) + + planner = create_planner(config=RRTConnectPlannerConfig()) + + assert planner.get_name() == "RRTConnect" + + +def test_vamp_config_rejects_invalid_algorithm() -> None: + """VAMP planner config validates the finite algorithm set.""" + with pytest.raises(ValidationError, match="algorithm"): + VampPlannerConfig.model_validate({"backend": "vamp", "algorithm": "invalid"}) + + +def test_official_vamp_artifact_loading_uses_installed_robot_module(fake_vamp_modules) -> None: + """Official artifact mode loads robot modules exposed by the VAMP package.""" + _, robot_module = fake_vamp_modules + + loaded_robot = load_vamp_robot_module(OfficialVampArtifactConfig(robot="panda")) + + assert loaded_robot is robot_module + + +def test_custom_vamp_artifact_loading_uses_explicit_module_path( + fake_vamp_modules, tmp_path +) -> None: + """Custom artifact mode imports a user-prepared local Python robot module.""" + assert fake_vamp_modules is not None + artifact_path = tmp_path / "custom_panda.py" + artifact_path.write_text("ROBOT_NAME = 'custom_panda'\n", encoding="utf-8") + + loaded_robot = load_vamp_robot_module(CustomVampArtifactConfig(path=artifact_path)) + + assert isinstance(loaded_robot, ModuleType) + assert loaded_robot.ROBOT_NAME == "custom_panda" + + +def test_custom_vamp_artifact_directory_import_restores_sys_path( + fake_vamp_modules, tmp_path +) -> None: + """Directory artifact imports do not permanently change module resolution order.""" + assert fake_vamp_modules is not None + artifact_dir = tmp_path / "custom_robot" + artifact_dir.mkdir() + (artifact_dir / "__init__.py").write_text("ROBOT_NAME = 'custom_robot'\n", encoding="utf-8") + parent = str(tmp_path) + assert parent not in sys.path + + try: + loaded_robot = load_vamp_robot_module(CustomVampArtifactConfig(path=artifact_dir)) + + assert isinstance(loaded_robot, ModuleType) + assert loaded_robot.ROBOT_NAME == "custom_robot" + assert parent not in sys.path + finally: + sys.modules.pop("custom_robot", None) + + +def test_create_world_and_planner_from_vamp_configs(fake_vamp_modules) -> None: + """Factory functions create VAMP world and planner adapters from typed configs.""" + world = create_world(config=VampWorldConfig()) + planner = create_planner(config=VampPlannerConfig(algorithm="fcit")) + + assert isinstance(world, VampWorld) + assert isinstance(planner, VampPlanner) + assert planner.get_name() == "VAMP/fcit" + + +def test_vamp_world_validity_fk_and_unsupported_jacobian(fake_vamp_modules) -> None: + """VAMP world delegates native validity/FK and rejects unsupported Jacobian.""" + _, robot_module = fake_vamp_modules + world = finalized_vamp_world() + robot_id = world.get_robot_ids()[0] + state = JointState(name=["joint1", "joint2", "joint3"], position=[0.1, 0.2, 0.3]) + + assert world.check_config_collision_free(robot_id, state) + assert robot_module.validate_calls[-1] == ([0.1, 0.2, 0.3], True) + + with world.scratch_context() as ctx: + world.set_joint_state(ctx, robot_id, state) + ee_pose = world.get_ee_pose(ctx, robot_id) + assert ee_pose.x == pytest.approx(0.1) + assert ee_pose.y == pytest.approx(0.2) + assert ee_pose.z == pytest.approx(0.3) + with pytest.raises(UnsupportedWorldCapabilityError, match="Jacobian"): + world.get_jacobian(ctx, robot_id) + + +def test_vamp_world_accepts_one_robot(fake_vamp_modules) -> None: + """VAMP world exposes WorldSpec robot APIs through a single robot slot.""" + world = VampWorld(VampWorldConfig()) + config = robot_config() + + robot_id = world.add_robot(config) + + assert robot_id == "robot_1" + assert world.get_robot_ids() == ["robot_1"] + assert world.get_robot_config(robot_id) is config + with pytest.raises(ValueError, match="one robot"): + world.add_robot(robot_config()) + + +def test_vamp_planner_dispatches_algorithm_simplifies_and_validates(fake_vamp_modules) -> None: + """VAMP planner uses configured algorithm and VAMP-native path utilities.""" + vamp_module, robot_module = fake_vamp_modules + world = finalized_vamp_world() + robot_id = world.get_robot_ids()[0] + planner = VampPlanner(VampPlannerConfig(algorithm="prm", simplify=True, validate_path=True)) + start = JointState( + name=["joint1", "joint2", "joint3", "gripper"], + position=[0.0, 0.0, 0.0, 0.9], + ) + goal = JointState( + name=["joint1", "joint2", "joint3", "gripper"], + position=[1.0, 0.5, 0.25, 0.1], + ) + + result = planner.plan_joint_path(world, robot_id, start, goal, timeout=0.25) + + assert result.status == PlanningStatus.SUCCESS + assert [point.position for point in result.path] == [[0.0, 0.0, 0.0], [1.0, 0.5, 0.25]] + assert [point.name for point in result.path] == [["joint1", "joint2", "joint3"]] * 2 + assert vamp_module.configure_calls == [("panda", "prm", 250)] + assert vamp_module.planner_calls == [ + ([0.0, 0.0, 0.0], [1.0, 0.5, 0.25], FakeSampler("fake_sampler")) + ] + assert len(robot_module.simplify_calls) == 1 + assert robot_module.motion_calls == [([0.0, 0.0, 0.0], [1.0, 0.5, 0.25], True)] + + +def test_vamp_planner_reports_native_planning_failure(fake_vamp_modules) -> None: + """Unsolved VAMP results map to a failed DimOS planning result.""" + vamp_module, _ = fake_vamp_modules + vamp_module.planner_solved = False + world = finalized_vamp_world() + robot_id = world.get_robot_ids()[0] + planner = VampPlanner(VampPlannerConfig(simplify=False, validate_path=False)) + + result = planner.plan_joint_path( + world, + robot_id, + JointState(name=["joint1", "joint2", "joint3"], position=[0.0, 0.0, 0.0]), + JointState(name=["joint1", "joint2", "joint3"], position=[1.0, 0.5, 0.25]), + ) + + assert result.status == PlanningStatus.NO_SOLUTION + assert "did not find a path" in result.message diff --git a/dimos/manipulation/planning/world/config.py b/dimos/manipulation/planning/world/config.py new file mode 100644 index 0000000000..d0b320bff6 --- /dev/null +++ b/dimos/manipulation/planning/world/config.py @@ -0,0 +1,78 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration models for manipulation world backends.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Annotated, Literal + +from pydantic import Field, TypeAdapter + +from dimos.protocol.service.spec import BaseConfig + + +class DrakeWorldConfig(BaseConfig): + """Configuration for the default Drake world backend.""" + + backend: Literal["drake"] = "drake" + + +class OfficialVampArtifactConfig(BaseConfig): + """Load a robot artifact exposed by the installed VAMP package.""" + + mode: Literal["official"] = "official" + robot: str = "panda" + + +class CustomVampArtifactConfig(BaseConfig): + """Load a user-prepared VAMP robot artifact from an explicit local path.""" + + mode: Literal["custom"] = "custom" + path: Path + + +VampArtifactConfig = Annotated[ + OfficialVampArtifactConfig | CustomVampArtifactConfig, + Field(discriminator="mode"), +] + + +class VampWorldConfig(BaseConfig): + """Configuration for the VAMP-native world backend.""" + + backend: Literal["vamp"] = "vamp" + artifact: VampArtifactConfig = Field(default_factory=OfficialVampArtifactConfig) + + +ManipulationWorldConfig = Annotated[ + DrakeWorldConfig | VampWorldConfig, + Field(discriminator="backend"), +] + +MANIPULATION_WORLD_CONFIG_ADAPTER: TypeAdapter[ManipulationWorldConfig] = TypeAdapter( + ManipulationWorldConfig +) + + +__all__ = [ + "MANIPULATION_WORLD_CONFIG_ADAPTER", + "CustomVampArtifactConfig", + "DrakeWorldConfig", + "ManipulationWorldConfig", + "OfficialVampArtifactConfig", + "VampArtifactConfig", + "VampWorldConfig", +] diff --git a/dimos/manipulation/planning/world/vamp_world.py b/dimos/manipulation/planning/world/vamp_world.py new file mode 100644 index 0000000000..be33a26d44 --- /dev/null +++ b/dimos/manipulation/planning/world/vamp_world.py @@ -0,0 +1,418 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VAMP-native WorldSpec implementation.""" + +from __future__ import annotations + +from contextlib import contextmanager +from copy import deepcopy +from dataclasses import dataclass +from itertools import pairwise +import time +from typing import TYPE_CHECKING, Protocol + +import numpy as np +from scipy.spatial.transform import Rotation as R + +try: + import vamp +except ImportError: + pass + +from dimos.manipulation.planning.planners.config import VampPlannerConfig +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.manipulation.planning.spec.enums import ObstacleType, PlanningStatus +from dimos.manipulation.planning.spec.models import Obstacle, PlanningResult, WorldRobotID +from dimos.manipulation.planning.spec.protocols import WorldSpec +from dimos.manipulation.planning.utils.path_utils import compute_path_length +from dimos.manipulation.planning.vamp.errors import UnsupportedWorldCapabilityError +from dimos.manipulation.planning.vamp.loader import load_vamp_robot_module, require_vamp +from dimos.manipulation.planning.world.config import VampWorldConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.utils.transform_utils import matrix_to_pose + +if TYPE_CHECKING: + from collections.abc import Generator + + from numpy.typing import NDArray + + +class _VampContext(Protocol): + """Typed context shape used by the VAMP world adapter.""" + + # TODO: Replace the loose WorldSpec context payload with a typed ContextProtocol. + joint_state: JointState + + +@dataclass +class _SingleRobotVampContext(_VampContext): + """Concrete VAMP context for the current single-robot backend.""" + + joint_state: JointState + + +class VampWorld(WorldSpec): + """World adapter for VAMP-native robot artifacts and validity checking.""" + + def __init__(self, config: VampWorldConfig) -> None: + self.config = config + require_vamp() + self._robot_module = load_vamp_robot_module(config.artifact) + self._environment = vamp.Environment() + self._robot_id: WorldRobotID | None = None + self._robot_config: RobotModelConfig | None = None + self._live_joint_state: JointState | None = None + self._obstacles: dict[str, Obstacle] = {} + self._finalized = False + + def add_robot(self, config: RobotModelConfig) -> WorldRobotID: + """Add a robot to the VAMP world.""" + if self._finalized: + raise RuntimeError("Cannot add robot after world is finalized") + if self._robot_config is not None: + raise ValueError("VAMP world currently supports one robot per world") + robot_id = "robot_1" + self._robot_id = robot_id + self._robot_config = config + home_positions = config.home_joints or [0.0] * len(config.joint_names) + self._live_joint_state = JointState( + name=config.joint_names, + position=home_positions, + ) + return robot_id + + def get_robot_ids(self) -> list[WorldRobotID]: + """Get all robot IDs.""" + if self._robot_id is None: + return [] + return [self._robot_id] + + def get_robot_config(self, robot_id: WorldRobotID) -> RobotModelConfig: + """Get robot configuration.""" + self._assert_robot_id(robot_id) + if self._robot_config is None: + raise RuntimeError("VAMP world has no robot config") + return self._robot_config + + def get_joint_limits( + self, robot_id: WorldRobotID + ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: + """Get joint limits from config or conservative defaults.""" + config = self.get_robot_config(robot_id) + if config.joint_limits_lower is not None and config.joint_limits_upper is not None: + return ( + np.array(config.joint_limits_lower, dtype=np.float64), + np.array(config.joint_limits_upper, dtype=np.float64), + ) + n_joints = len(config.joint_names) + return (np.full(n_joints, -np.pi), np.full(n_joints, np.pi)) + + def add_obstacle(self, obstacle: Obstacle) -> str: + """Add an obstacle and rebuild the VAMP environment.""" + self._obstacles[obstacle.name] = obstacle + self._rebuild_environment() + return obstacle.name + + def remove_obstacle(self, obstacle_id: str) -> bool: + """Remove an obstacle.""" + existed = obstacle_id in self._obstacles + self._obstacles.pop(obstacle_id, None) + if existed: + self._rebuild_environment() + return existed + + def update_obstacle_pose(self, obstacle_id: str, pose: PoseStamped) -> bool: + """Update an obstacle pose.""" + if obstacle_id not in self._obstacles: + return False + self._obstacles[obstacle_id].pose = pose + self._rebuild_environment() + return True + + def clear_obstacles(self) -> None: + """Remove all obstacles.""" + self._obstacles.clear() + self._rebuild_environment() + + def get_obstacles(self) -> list[Obstacle]: + """Get all obstacles.""" + return list(self._obstacles.values()) + + def finalize(self) -> None: + """Finalize the VAMP world.""" + self._finalized = True + + @property + def is_finalized(self) -> bool: + """Check if the world is finalized.""" + return self._finalized + + def get_live_context(self) -> _VampContext: + """Get the live VAMP context.""" + return _SingleRobotVampContext(self._require_live_joint_state()) + + @contextmanager + def scratch_context(self) -> Generator[_VampContext, None, None]: + """Get a scratch context with copied joint states.""" + yield _SingleRobotVampContext(deepcopy(self._require_live_joint_state())) + + def sync_from_joint_state(self, robot_id: WorldRobotID, joint_state: JointState) -> None: + """Sync live state from a joint-state message.""" + self._assert_robot_id(robot_id) + self._live_joint_state = self._joint_state_for_robot_order(robot_id, joint_state) + + def set_joint_state( + self, ctx: _VampContext, robot_id: WorldRobotID, joint_state: JointState + ) -> None: + """Set robot joint state in a context.""" + self._assert_robot_id(robot_id) + ctx.joint_state = self._joint_state_for_robot_order(robot_id, joint_state) + + def get_joint_state(self, ctx: _VampContext, robot_id: WorldRobotID) -> JointState: + """Get robot joint state from a context.""" + self._assert_robot_id(robot_id) + return ctx.joint_state + + def is_collision_free(self, ctx: _VampContext, robot_id: WorldRobotID) -> bool: + """Check if current configuration is valid according to VAMP.""" + self._assert_robot_id(robot_id) + return self._validate_state(ctx.joint_state, check_bounds=True) + + def get_min_distance(self, ctx: _VampContext, robot_id: WorldRobotID) -> float: + """Minimum distance is not exposed by VAMP's Python API.""" + raise UnsupportedWorldCapabilityError("vamp", "minimum distance query") + + def check_config_collision_free(self, robot_id: WorldRobotID, joint_state: JointState) -> bool: + """Check a joint state using VAMP native validation.""" + return self._validate_state( + self._joint_state_for_robot_order(robot_id, joint_state), check_bounds=True + ) + + def check_edge_collision_free( + self, + robot_id: WorldRobotID, + start: JointState, + end: JointState, + step_size: float = 0.05, + ) -> bool: + """Check an edge using VAMP native motion validation.""" + del step_size + start_state = self._joint_state_for_robot_order(robot_id, start) + end_state = self._joint_state_for_robot_order(robot_id, end) + result = self._robot_module.validate_motion( + list(start_state.position), + list(end_state.position), + self._environment, + True, + ) + return bool(result) + + def get_ee_pose(self, ctx: _VampContext, robot_id: WorldRobotID) -> PoseStamped: + """Get end-effector pose from VAMP eefk.""" + self._assert_robot_id(robot_id) + joint_state = ctx.joint_state + transform = np.asarray( + self._robot_module.eefk(list(joint_state.position)), + dtype=np.float64, + ) + pose = matrix_to_pose(transform) + return PoseStamped(position=pose.position, orientation=pose.orientation, frame_id="world") + + def get_link_pose( + self, ctx: _VampContext, robot_id: WorldRobotID, link_name: str + ) -> NDArray[np.float64]: + """Return EE pose only when the requested link is the configured EE link.""" + config = self.get_robot_config(robot_id) + if link_name != config.end_effector_link: + raise UnsupportedWorldCapabilityError("vamp", f"link pose for '{link_name}'") + joint_state = ctx.joint_state + return np.asarray( + self._robot_module.eefk(list(joint_state.position)), + dtype=np.float64, + ) + + def get_jacobian(self, ctx: _VampContext, robot_id: WorldRobotID) -> NDArray[np.float64]: + """VAMP's Python API does not expose a Jacobian.""" + raise UnsupportedWorldCapabilityError("vamp", "end-effector Jacobian") + + def plan_joint_path( + self, + planner_config: VampPlannerConfig, + robot_id: WorldRobotID, + start: JointState, + goal: JointState, + timeout: float = 10.0, + ) -> PlanningResult: + """Plan a VAMP-native joint-space path inside the VAMP world adapter.""" + start_time = time.time() + if not self.is_finalized: + return _failure(PlanningStatus.NO_SOLUTION, "World must be finalized before planning") + if robot_id != self._robot_id: + return _failure(PlanningStatus.NO_SOLUTION, f"Robot '{robot_id}' not found") + + if not self.check_config_collision_free(robot_id, start): + return _failure(PlanningStatus.COLLISION_AT_START, "Start configuration is invalid") + if not self.check_config_collision_free(robot_id, goal): + return _failure(PlanningStatus.COLLISION_AT_GOAL, "Goal configuration is invalid") + start_state = self._joint_state_for_robot_order(robot_id, start) + goal_state = self._joint_state_for_robot_order(robot_id, goal) + + robot_module, planner_func, plan_settings, simplify_settings = ( + vamp.configure_robot_and_planner_with_kwargs( + self._robot_name(), + planner_config.algorithm, + max_iterations=_timeout_to_iteration_budget(timeout), + ) + ) + sampler = robot_module.halton() + result = planner_func( + list(start_state.position), + list(goal_state.position), + self._environment, + plan_settings, + sampler, + ) + if not result.solved: + return _failure( + PlanningStatus.NO_SOLUTION, + "VAMP planner did not find a path", + planning_time=time.time() - start_time, + iterations=result.iterations, + ) + + path_source = result.path + if planner_config.simplify: + simplified = robot_module.simplify( + path_source, self._environment, simplify_settings, sampler + ) + if simplified.solved: + path_source = simplified.path + + path_array = np.asarray(path_source.numpy(), dtype=np.float64) + joint_names = start_state.name or self.get_robot_config(robot_id).joint_names + path = [ + JointState(name=joint_names, position=row.astype(float).tolist()) for row in path_array + ] + if planner_config.validate_path and not self._validate_path(robot_id, path): + return _failure( + PlanningStatus.NO_SOLUTION, + "VAMP returned a path that failed native validation", + planning_time=time.time() - start_time, + ) + return PlanningResult( + status=PlanningStatus.SUCCESS, + path=path, + planning_time=time.time() - start_time, + path_length=compute_path_length(path), + iterations=result.iterations, + message="VAMP planning succeeded", + ) + + def _joint_state_for_robot_order( + self, robot_id: WorldRobotID, joint_state: JointState + ) -> JointState: + """Return a joint state truncated to VAMP's configured robot joint order.""" + config = self.get_robot_config(robot_id) + positions = list(joint_state.position[: len(config.joint_names)]) + names = list(joint_state.name[: len(positions)]) if joint_state.name else config.joint_names + return JointState(name=names, position=positions) + + def _robot_name(self) -> str: + if self.config.artifact.mode == "official": + return self.config.artifact.robot + return self._robot_module.__name__.split(".")[-1] + + def _assert_robot_id(self, robot_id: WorldRobotID) -> None: + if robot_id != self._robot_id: + raise KeyError(robot_id) + + def _require_live_joint_state(self) -> JointState: + if self._live_joint_state is None: + raise RuntimeError("VAMP world has no robot joint state") + return self._live_joint_state + + def _validate_path(self, robot_id: WorldRobotID, path: list[JointState]) -> bool: + if not path: + return False + return all( + self.check_edge_collision_free(robot_id, before, after) + for before, after in pairwise(path) + ) + + def _validate_state(self, joint_state: JointState, check_bounds: bool) -> bool: + return bool( + self._robot_module.validate( + list(joint_state.position), + self._environment, + check_bounds, + ) + ) + + def _rebuild_environment(self) -> None: + require_vamp() + self._environment = vamp.Environment() + for obstacle in self._obstacles.values(): + self._add_obstacle_to_environment(obstacle) + + def _add_obstacle_to_environment(self, obstacle: Obstacle) -> None: + center = [obstacle.pose.position.x, obstacle.pose.position.y, obstacle.pose.position.z] + euler_xyz = ( + R.from_quat( + [ + obstacle.pose.orientation.x, + obstacle.pose.orientation.y, + obstacle.pose.orientation.z, + obstacle.pose.orientation.w, + ] + ) + .as_euler("xyz") + .tolist() + ) + require_vamp() + if obstacle.obstacle_type == ObstacleType.SPHERE: + self._environment.add_sphere(vamp.Sphere(center, obstacle.dimensions[0])) + elif obstacle.obstacle_type == ObstacleType.BOX: + half_extents = [dimension / 2.0 for dimension in obstacle.dimensions] + self._environment.add_cuboid(vamp.Cuboid(center, euler_xyz, half_extents)) + elif obstacle.obstacle_type == ObstacleType.CYLINDER: + self._environment.add_capsule( + vamp.Cylinder( + center, + euler_xyz, + obstacle.dimensions[0], + obstacle.dimensions[1], + ) + ) + else: + raise UnsupportedWorldCapabilityError("vamp", f"{obstacle.obstacle_type.name} obstacle") + + +def _timeout_to_iteration_budget(timeout: float) -> int: + return max(1, int(timeout * 1000)) + + +def _failure( + status: PlanningStatus, + message: str, + planning_time: float = 0.0, + iterations: int = 0, +) -> PlanningResult: + return PlanningResult( + status=status, + planning_time=planning_time, + iterations=iterations, + message=message, + ) diff --git a/dimos/manipulation/test_manipulation_unit.py b/dimos/manipulation/test_manipulation_unit.py index 0f84bed419..17889d1d0a 100644 --- a/dimos/manipulation/test_manipulation_unit.py +++ b/dimos/manipulation/test_manipulation_unit.py @@ -17,11 +17,11 @@ from __future__ import annotations from pathlib import Path -import threading from unittest.mock import MagicMock, patch import pytest +from dimos.core.module import Module from dimos.manipulation.manipulation_module import ( ManipulationModule, ManipulationModuleConfig, @@ -29,10 +29,13 @@ ) from dimos.manipulation.planning.kinematics.config import PinkKinematicsConfig from dimos.manipulation.planning.monitor.world_monitor import WorldMonitor +from dimos.manipulation.planning.planners.config import RRTConnectPlannerConfig, VampPlannerConfig from dimos.manipulation.planning.spec.config import RobotModelConfig from dimos.manipulation.planning.spec.enums import IKStatus from dimos.manipulation.planning.spec.models import IKResult from dimos.manipulation.planning.spec.protocols import VisualizationSpec +from dimos.manipulation.planning.vamp.errors import UnsupportedWorldCapabilityError +from dimos.manipulation.planning.world.config import DrakeWorldConfig, VampWorldConfig from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion @@ -94,20 +97,9 @@ def simple_trajectory(): def _make_module(): - """Create a ManipulationModule instance with mocked __init__.""" - with patch.object(ManipulationModule, "__init__", lambda self: None): - module = ManipulationModule.__new__(ManipulationModule) - module._state = ManipulationState.IDLE - module._lock = threading.Lock() - module._error_message = "" - module._robots = {} - module._planned_paths = {} - module._planned_trajectories = {} - module._world_monitor = None - module._planner = None - module._kinematics = None - module._coordinator_client = None - return module + """Create a ManipulationModule while skipping only base Module side effects.""" + with patch.object(Module, "__init__", return_value=None): + return ManipulationModule() class TestStateMachine: @@ -202,7 +194,7 @@ def test_multiple_robots_require_name(self, robot_config): class TestPlanningInitialization: """Test planning backend configuration wiring.""" - def test_kinematics_config_is_passed_to_factory(self, robot_config): + def test_kinematics_config_is_passed_to_factory(self, robot_config, mocker): """ManipulationModule config selects the requested IK backend.""" module = _make_module() kinematics = PinkKinematicsConfig(max_iterations=100, dt=0.02) @@ -214,42 +206,44 @@ def test_kinematics_config_is_passed_to_factory(self, robot_config): mock_world_monitor = MagicMock(spec=WorldMonitor) mock_world_monitor.add_robot.return_value = "robot_id" - with ( - patch( - "dimos.manipulation.manipulation_module.WorldMonitor", - return_value=mock_world_monitor, - ), - patch("dimos.manipulation.manipulation_module.JointTrajectoryGenerator"), - patch("dimos.manipulation.manipulation_module.create_planner") as mock_planner, - patch("dimos.manipulation.manipulation_module.create_kinematics") as mock_kinematics, - ): - module._initialize_planning() + mocker.patch( + "dimos.manipulation.manipulation_module.WorldMonitor", + return_value=mock_world_monitor, + ) + mocker.patch("dimos.manipulation.manipulation_module.JointTrajectoryGenerator") + mock_planner = mocker.patch("dimos.manipulation.manipulation_module.create_planner") + mock_kinematics = mocker.patch("dimos.manipulation.manipulation_module.create_kinematics") + + module._initialize_planning() - mock_planner.assert_called_once_with(name="rrt_connect") + mock_planner.assert_called_once() + planner_config = mock_planner.call_args.kwargs["config"] + assert isinstance(planner_config, RRTConnectPlannerConfig) mock_kinematics.assert_called_once_with(config=kinematics) - def test_legacy_kinematics_name_still_selects_backend(self, robot_config): + def test_legacy_kinematics_name_still_selects_backend(self, robot_config, mocker): """The old kinematics_name field remains a compatibility shim.""" module = _make_module() - module.config = ManipulationModuleConfig( - robots=[robot_config], - kinematics_name="pink", - enable_viz=False, - ) + with pytest.warns(DeprecationWarning, match="kinematics_name is deprecated"): + module.config = ManipulationModuleConfig( + robots=[robot_config], + kinematics_name="pink", + enable_viz=False, + ) mock_world_monitor = MagicMock(spec=WorldMonitor) mock_world_monitor.add_robot.return_value = "robot_id" - with ( - patch( - "dimos.manipulation.manipulation_module.WorldMonitor", - return_value=mock_world_monitor, - ), - patch("dimos.manipulation.manipulation_module.JointTrajectoryGenerator"), - patch("dimos.manipulation.manipulation_module.create_planner"), - patch("dimos.manipulation.manipulation_module.create_kinematics") as mock_kinematics, - ): - module._initialize_planning() + mocker.patch( + "dimos.manipulation.manipulation_module.WorldMonitor", + return_value=mock_world_monitor, + ) + mocker.patch("dimos.manipulation.manipulation_module.JointTrajectoryGenerator") + mocker.patch("dimos.manipulation.manipulation_module.create_planner") + mock_kinematics = mocker.patch("dimos.manipulation.manipulation_module.create_kinematics") + module._initialize_planning() + + mock_kinematics.assert_called_once() call_config = mock_kinematics.call_args.kwargs["config"] assert isinstance(call_config, PinkKinematicsConfig) @@ -269,6 +263,69 @@ def test_nested_kinematics_config_parses_cli_override_shape(self) -> None: assert config.kinematics.dt == 0.02 assert config.kinematics.posture_cost == 0.0 + def test_nested_world_and_planner_config_parses_cli_override_shape(self) -> None: + """Pydantic parses nested world/planner config shapes used by -o overrides.""" + config = ManipulationModuleConfig( + world={ + "backend": "vamp", + "artifact": { + "mode": "official", + "robot": "panda", + }, + }, + planner={ + "backend": "vamp", + "algorithm": "prm", + "simplify": "false", + "validate_path": "true", + }, + ) + + assert isinstance(config.world, VampWorldConfig) + assert config.world.artifact.mode == "official" + assert config.world.artifact.robot == "panda" + assert isinstance(config.planner, VampPlannerConfig) + assert config.planner.algorithm == "prm" + assert config.planner.simplify is False + assert config.planner.validate_path is True + + def test_default_world_and_planner_config_preserves_drake_rrt_behavior(self) -> None: + """Default config remains Drake world plus RRT-Connect planner.""" + config = ManipulationModuleConfig() + + assert isinstance(config.world, DrakeWorldConfig) + assert isinstance(config.planner, RRTConnectPlannerConfig) + assert config.kinematics.backend == "jacobian" + + def test_legacy_planner_name_still_selects_backend(self) -> None: + """The old planner_name field remains a noisy compatibility shim.""" + with pytest.warns(DeprecationWarning, match="planner_name is deprecated"): + config = ManipulationModuleConfig(planner_name="vamp") + + assert isinstance(config.planner, VampPlannerConfig) + + def test_vamp_planner_requires_vamp_world(self, robot_config) -> None: + """Planning initialization fails early for invalid VAMP planner pairing.""" + module = _make_module() + module.config = ManipulationModuleConfig( + robots=[robot_config], + planner={"backend": "vamp"}, + ) + + with pytest.raises(ValueError, match="VAMP planner requires world backend 'vamp'"): + module._initialize_planning() + + def test_vamp_world_requires_vamp_planner(self, robot_config) -> None: + """Planning initialization fails early for invalid VAMP world pairing.""" + module = _make_module() + module.config = ManipulationModuleConfig( + robots=[robot_config], + world={"backend": "vamp"}, + ) + + with pytest.raises(ValueError, match="VAMP world backend requires planner backend 'vamp'"): + module._initialize_planning() + def test_solve_ik_rpc_calls_configured_backend(self, robot_config): """solve_ik returns the backend IKResult without path planning.""" module = _make_module() @@ -341,6 +398,29 @@ def test_solve_ik_rpc_uses_explicit_seed(self, robot_config): assert kwargs["seed"] is explicit_seed module._world_monitor.get_current_joint_state.assert_not_called() + def test_solve_ik_rpc_reports_unsupported_world_capability(self, robot_config): + """Pose planning surfaces incompatible world/kinematics capabilities clearly.""" + module = _make_module() + module._robots = {"test_arm": ("robot_id", robot_config, MagicMock())} + module._world_monitor = MagicMock() + module._world_monitor.world = MagicMock() + module._world_monitor.get_current_joint_state.return_value = JointState( + name=robot_config.joint_names, position=[0.0, 0.0, 0.0] + ) + module._kinematics = MagicMock() + module._kinematics.solve.side_effect = UnsupportedWorldCapabilityError( + "vamp", "end-effector Jacobian" + ) + + pose = Pose(position=Vector3(x=0.45, y=0.0, z=0.25), orientation=Quaternion()) + result = module.solve_ik(pose) + + assert result.status == IKStatus.NO_SOLUTION + assert "end-effector Jacobian" in result.message + assert module._state == ManipulationState.IDLE + assert module._planned_paths == {} + module._kinematics.solve.assert_called_once() + class TestJointNameTranslation: """Test trajectory joint name translation for coordinator.""" @@ -585,6 +665,9 @@ def test_no_monitor_returns_early(self, robot_config_with_mapping): ) module._on_joint_state(msg) + assert module._world_monitor is None + assert module._init_joints == {} + class TestWorldMonitorVisualization: def test_visualization_routing_and_stop_all_monitors(self): @@ -629,6 +712,9 @@ def test_dismiss_preview_noop_without_monitor(self): module._dismiss_preview("robot_id") + assert module._world_monitor is None + assert module._state == ManipulationState.IDLE + def test_dismiss_preview_routes_to_monitor(self): module = _make_module() module._world_monitor = MagicMock() diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 291af8540e..b8da78094b 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -74,6 +74,7 @@ "mid360-pointlio-voxels": "dimos.hardware.sensors.lidar.pointlio.pointlio_blueprints:mid360_pointlio_voxels", "openarm-mock-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_mock_planner_coordinator", "openarm-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_planner_coordinator", + "panda-coordinator": "dimos.robot.manipulators.franka.blueprints:panda_coordinator", "path-planner-eval": "dimos.navigation.nav_3d.evaluator.blueprints:path_planner_eval", "teleop-hosted-go2": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2", "teleop-hosted-xarm7": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_xarm7", diff --git a/dimos/robot/catalog/franka.py b/dimos/robot/catalog/franka.py new file mode 100644 index 0000000000..8bf62d934b --- /dev/null +++ b/dimos/robot/catalog/franka.py @@ -0,0 +1,69 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Franka Panda robot configurations.""" + +from __future__ import annotations + +from dimos.robot.config import RobotConfig +from dimos.utils.data import LfsPath + +# LFS-backed: data/.lfs/franka_description.tar.gz extracts to data/franka_description/. +_FRANKA_DESCRIPTION_PKG = LfsPath("franka_description") + +# Keep URDF/SRDF paths explicit so VAMP tests/benchmarks can validate that the +# DimOS robot model matches the official VAMP Panda artifact joint order. +FRANKA_PANDA_MODEL = _FRANKA_DESCRIPTION_PKG / "urdf/panda.urdf.xacro" +FRANKA_PANDA_FK_MODEL = _FRANKA_DESCRIPTION_PKG / "urdf/panda.urdf" +FRANKA_PANDA_SRDF = _FRANKA_DESCRIPTION_PKG / "srdf/panda.srdf" + +FRANKA_PANDA_JOINT_NAMES = [f"panda_joint{i}" for i in range(1, 8)] +FRANKA_PANDA_HOME_JOINTS = [0.0, -0.7853981634, 0.0, -2.35619449, 0.0, 1.5707963268, 0.7853981634] + + +def franka_panda( + name: str = "panda", + *, + adapter_type: str = "mock", + address: str | None = None, +) -> RobotConfig: + """Franka Panda config for mock-control planning tests and benchmarks.""" + return RobotConfig( + name=name, + model_path=FRANKA_PANDA_MODEL, + end_effector_link="panda_hand", + adapter_type=adapter_type, + address=address, + joint_names=FRANKA_PANDA_JOINT_NAMES, + base_link="panda_link0", + home_joints=FRANKA_PANDA_HOME_JOINTS, + package_paths={ + "franka_description": _FRANKA_DESCRIPTION_PKG, + "moveit_resources_panda_description": _FRANKA_DESCRIPTION_PKG, + }, + auto_convert_meshes=True, + max_velocity=1.0, + max_acceleration=2.0, + adapter_kwargs={"srdf_path": FRANKA_PANDA_SRDF}, + ) + + +__all__ = [ + "FRANKA_PANDA_FK_MODEL", + "FRANKA_PANDA_HOME_JOINTS", + "FRANKA_PANDA_JOINT_NAMES", + "FRANKA_PANDA_MODEL", + "FRANKA_PANDA_SRDF", + "franka_panda", +] diff --git a/dimos/robot/catalog/test_franka.py b/dimos/robot/catalog/test_franka.py new file mode 100644 index 0000000000..578685852c --- /dev/null +++ b/dimos/robot/catalog/test_franka.py @@ -0,0 +1,91 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the Franka Panda robot catalog.""" + +from __future__ import annotations + +from dimos.control.components import HardwareType +from dimos.robot.catalog.franka import ( + FRANKA_PANDA_FK_MODEL, + FRANKA_PANDA_JOINT_NAMES, + FRANKA_PANDA_MODEL, + FRANKA_PANDA_SRDF, + franka_panda, +) +from dimos.utils.data import LfsPath + + +def test_franka_panda_catalog_defaults_to_mock_control() -> None: + """The Panda catalog config is mock-control first.""" + config = franka_panda() + + assert config.name == "panda" + assert config.adapter_type == "mock" + assert config.address is None + assert config.joint_names == FRANKA_PANDA_JOINT_NAMES + assert config.end_effector_link == "panda_hand" + assert config.base_link == "panda_link0" + + +def test_franka_panda_uses_lfs_backed_model_and_srdf_paths() -> None: + """Panda URDF/SRDF resources follow the repo LFS-backed data pattern.""" + assert isinstance(FRANKA_PANDA_MODEL, LfsPath) + assert isinstance(FRANKA_PANDA_FK_MODEL, LfsPath) + assert isinstance(FRANKA_PANDA_SRDF, LfsPath) + + +def test_franka_panda_hardware_component_uses_mock_adapter_and_prefixed_joints() -> None: + """RobotConfig conversion feeds ControlCoordinator mock hardware.""" + config = franka_panda() + + component = config.to_hardware_component() + + assert component.hardware_id == "panda" + assert component.hardware_type == HardwareType.MANIPULATOR + assert component.adapter_type == "mock" + assert component.joints == [f"panda/{joint}" for joint in FRANKA_PANDA_JOINT_NAMES] + assert component.adapter_kwargs["initial_positions"] == config.home_joints + + +def test_franka_panda_robot_model_config_preserves_vamp_joint_order() -> None: + """Manipulation robot model config keeps the official Panda joint order.""" + config = franka_panda() + + model = config.to_robot_model_config() + + assert model.name == "panda" + assert isinstance(model.model_path, LfsPath) + assert model.joint_names == FRANKA_PANDA_JOINT_NAMES + assert model.joint_name_mapping == { + f"panda/{joint}": joint for joint in FRANKA_PANDA_JOINT_NAMES + } + + +def test_franka_panda_task_config_supports_mock_coordinator_benchmark_path() -> None: + """Catalog output can build the same mock coordinator task shape as xArm/Piper.""" + config = franka_panda() + + task = config.to_task_config( + task_type="cartesian_ik", + task_name="cartesian_ik_panda", + model_path=FRANKA_PANDA_FK_MODEL, + ee_joint_id=config.dof, + ) + + assert task.name == "cartesian_ik_panda" + assert task.type == "cartesian_ik" + assert task.joint_names == [f"panda/{joint}" for joint in FRANKA_PANDA_JOINT_NAMES] + assert isinstance(task.params["model_path"], LfsPath) + assert task.params["ee_joint_id"] == 7 diff --git a/dimos/robot/config.py b/dimos/robot/config.py index d0922cb842..9c7e292beb 100644 --- a/dimos/robot/config.py +++ b/dimos/robot/config.py @@ -82,7 +82,6 @@ class RobotConfig(BaseModel): joint_prefix: str | None = None # defaults to "{name}_" base_pose: list[float] = Field(default_factory=lambda: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]) - # Planning max_velocity: float = 1.0 max_acceleration: float = 2.0 pre_grasp_offset: float = 0.10 diff --git a/dimos/robot/manipulators/franka/blueprints.py b/dimos/robot/manipulators/franka/blueprints.py new file mode 100644 index 0000000000..75abc25cbe --- /dev/null +++ b/dimos/robot/manipulators/franka/blueprints.py @@ -0,0 +1,58 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Mock-control blueprint for Franka Panda planning. + +Usage: + uv run --extra manipulation --extra vamp dimos run panda-coordinator \ + -o manipulationmodule.world.backend=vamp \ + -o manipulationmodule.world.artifact.mode=official \ + -o manipulationmodule.world.artifact.robot=panda \ + -o manipulationmodule.planner.backend=vamp \ + -o manipulationmodule.planner.algorithm=rrtc +""" + +from dimos.control.coordinator import ControlCoordinator +from dimos.core.coordination.blueprints import autoconnect +from dimos.manipulation.manipulation_module import ManipulationModule +from dimos.robot.catalog.franka import FRANKA_PANDA_FK_MODEL, franka_panda + +_panda_cfg = franka_panda(name="panda") + +panda_coordinator = autoconnect( + ManipulationModule.blueprint( + robots=[_panda_cfg.to_robot_model_config()], + planning_timeout=10.0, + enable_viz=False, + ), + ControlCoordinator.blueprint( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[_panda_cfg.to_hardware_component()], + tasks=[ + _panda_cfg.to_task_config( + task_type="cartesian_ik", + task_name="cartesian_ik_panda", + model_path=FRANKA_PANDA_FK_MODEL, + ee_joint_id=_panda_cfg.dof, + ), + ], + ), +) + +# Alias matching existing xArm naming style. +panda_planner_coordinator = panda_coordinator + +__all__ = ["panda_coordinator", "panda_planner_coordinator"] diff --git a/dimos/robot/manipulators/franka/test_blueprints.py b/dimos/robot/manipulators/franka/test_blueprints.py new file mode 100644 index 0000000000..4ec8a9b7e8 --- /dev/null +++ b/dimos/robot/manipulators/franka/test_blueprints.py @@ -0,0 +1,53 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Franka Panda blueprints.""" + +from __future__ import annotations + +from dimos.robot.all_blueprints import all_blueprints +from dimos.robot.manipulators.franka.blueprints import panda_coordinator + + +def test_panda_coordinator_is_registered() -> None: + assert ( + all_blueprints["panda-coordinator"] + == "dimos.robot.manipulators.franka.blueprints:panda_coordinator" + ) + + +def test_panda_coordinator_accepts_vamp_cli_override_shape() -> None: + config = panda_coordinator.config()( + manipulationmodule={ + "world": { + "backend": "vamp", + "artifact": {"mode": "official", "robot": "panda"}, + }, + "planner": { + "backend": "vamp", + "algorithm": "rrtc", + "simplify": "true", + "validate_path": "true", + }, + } + ) + + assert config.manipulationmodule is not None + module_config = config.manipulationmodule + assert module_config.world.backend == "vamp" + assert module_config.world.artifact.robot == "panda" + assert module_config.planner.backend == "vamp" + assert module_config.planner.algorithm == "rrtc" + assert module_config.planner.simplify is True + assert module_config.planner.validate_path is True diff --git a/pyproject.toml b/pyproject.toml index 55e7ffcb73..0edb934b42 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -275,6 +275,12 @@ manipulation = [ "pyyaml>=6.0", ] +vamp = [ + # Optional VAMP planning backend. Keep out of broad extras to avoid forcing + # native planner bindings into non-VAMP manipulation environments. + "vamp-planner @ git+https://github.com/TomCC7/vamp.git@2b59552179c2af473c59c8617519e4663497fbe6", +] + cpu = [ # CPU inference backends "onnxruntime", diff --git a/stubs/vamp/__init__.pyi b/stubs/vamp/__init__.pyi new file mode 100644 index 0000000000..71ce378efa --- /dev/null +++ b/stubs/vamp/__init__.pyi @@ -0,0 +1,84 @@ +from collections.abc import Callable, Sequence + +import numpy as np +from numpy.typing import NDArray + +from . import baxter as baxter, fetch as fetch, panda as panda, sphere as sphere, ur5 as ur5 + +class Path: + def numpy(self) -> NDArray[np.float64]: ... + +class Environment: + def add_sphere(self, sphere: Sphere) -> None: ... + def add_cuboid(self, cuboid: Cuboid) -> None: ... + def add_capsule(self, capsule: Cylinder) -> None: ... + +class Sphere: + def __init__(self, center: Sequence[float], radius: float) -> None: ... + +class Cuboid: + def __init__( + self, + center: Sequence[float], + euler_xyz: Sequence[float], + half_extents: Sequence[float], + ) -> None: ... + +class Cylinder: + def __init__( + self, + center: Sequence[float], + euler_xyz: Sequence[float], + radius: float, + length: float, + ) -> None: ... + +class PlanningResult: + solved: bool + path: Path + iterations: int + +class PlannerSettings: + pass + +class SimplifySettings: + pass + +class Sampler: + pass + +class RobotModule: + __name__: str + def halton(self) -> Sampler: ... + def validate( + self, + configuration: Sequence[float], + environment: Environment, + check_bounds: bool, + ) -> bool: ... + def validate_motion( + self, + configuration_in: Sequence[float], + configuration_out: Sequence[float], + environment: Environment, + check_bounds: bool, + ) -> bool: ... + def eefk(self, configuration: Sequence[float]) -> NDArray[np.float64]: ... + def simplify( + self, + path: Path, + environment: Environment, + settings: SimplifySettings, + sampler: Sampler, + ) -> PlanningResult: ... + +PlannerFunction = Callable[ + [Sequence[float], Sequence[float], Environment, PlannerSettings, Sampler], + PlanningResult, +] + +def configure_robot_and_planner_with_kwargs( + robot_name: str, + planner_name: str, + max_iterations: int, +) -> tuple[RobotModule, PlannerFunction, PlannerSettings, SimplifySettings]: ... diff --git a/stubs/vamp/baxter.pyi b/stubs/vamp/baxter.pyi new file mode 100644 index 0000000000..c4d7527c1d --- /dev/null +++ b/stubs/vamp/baxter.pyi @@ -0,0 +1,5 @@ +from . import RobotModule + +__name__: str + +def halton() -> RobotModule: ... diff --git a/stubs/vamp/fetch.pyi b/stubs/vamp/fetch.pyi new file mode 100644 index 0000000000..c4d7527c1d --- /dev/null +++ b/stubs/vamp/fetch.pyi @@ -0,0 +1,5 @@ +from . import RobotModule + +__name__: str + +def halton() -> RobotModule: ... diff --git a/stubs/vamp/panda.pyi b/stubs/vamp/panda.pyi new file mode 100644 index 0000000000..c4d7527c1d --- /dev/null +++ b/stubs/vamp/panda.pyi @@ -0,0 +1,5 @@ +from . import RobotModule + +__name__: str + +def halton() -> RobotModule: ... diff --git a/stubs/vamp/sphere.pyi b/stubs/vamp/sphere.pyi new file mode 100644 index 0000000000..c4d7527c1d --- /dev/null +++ b/stubs/vamp/sphere.pyi @@ -0,0 +1,5 @@ +from . import RobotModule + +__name__: str + +def halton() -> RobotModule: ... diff --git a/stubs/vamp/ur5.pyi b/stubs/vamp/ur5.pyi new file mode 100644 index 0000000000..c4d7527c1d --- /dev/null +++ b/stubs/vamp/ur5.pyi @@ -0,0 +1,5 @@ +from . import RobotModule + +__name__: str + +def halton() -> RobotModule: ... diff --git a/uv.lock b/uv.lock index 0736ec2961..cfda49fe11 100644 --- a/uv.lock +++ b/uv.lock @@ -2171,6 +2171,9 @@ unitree-dds = [ { name = "unitree-webrtc-connect-leshy" }, { name = "uvicorn" }, ] +vamp = [ + { name = "vamp-planner" }, +] visualization = [ { name = "dimos-viewer" }, { name = "rerun-sdk" }, @@ -2455,13 +2458,14 @@ requires-dist = [ { name = "unitree-sdk2py-dimos", marker = "extra == 'unitree-dds'", specifier = ">=1.0.2" }, { name = "unitree-webrtc-connect-leshy", marker = "extra == 'unitree'", specifier = ">=2.0.7" }, { name = "uvicorn", marker = "extra == 'web'", specifier = ">=0.34.0" }, + { name = "vamp-planner", marker = "extra == 'vamp'", git = "https://github.com/TomCC7/vamp.git?rev=2b59552179c2af473c59c8617519e4663497fbe6" }, { name = "xacro", marker = "extra == 'manipulation'" }, { name = "xarm-python-sdk", marker = "extra == 'manipulation'", specifier = ">=1.17.0" }, { name = "xarm-python-sdk", marker = "extra == 'misc'", specifier = ">=1.17.0" }, { name = "xformers", marker = "platform_machine == 'x86_64' and extra == 'cuda'", specifier = ">=0.0.20" }, { name = "yapf", marker = "extra == 'misc'", specifier = "==0.40.2" }, ] -provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "base", "apriltag", "all"] +provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "vamp", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "base", "apriltag", "all"] [package.metadata.requires-dev] autofix = [{ name = "ruff", specifier = "==0.14.3" }] @@ -11175,6 +11179,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e4/16/c1fd27e9549f3c4baf1dc9c20c456cd2f822dbf8de9f463824b0c0357e06/uvloop-0.22.1-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6cde23eeda1a25c75b2e07d39970f3374105d5eafbaab2a4482be82f272d5a5e", size = 4296730, upload-time = "2025-10-16T22:17:00.744Z" }, ] +[[package]] +name = "vamp-planner" +version = "0.6.4" +source = { git = "https://github.com/TomCC7/vamp.git?rev=2b59552179c2af473c59c8617519e4663497fbe6#2b59552179c2af473c59c8617519e4663497fbe6" } +dependencies = [ + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, +] + [[package]] name = "virtualenv" version = "20.36.1"