feat: Add atomic action abstraction layer for embodied AI motion generation#239
feat: Add atomic action abstraction layer for embodied AI motion generation#239
Conversation
- Fix dataclass field ordering in ObjectSemantics (non-default follows default) - Convert batch_size property to get_batch_size() method for consistency - Add missing grasp_types field and get_grasp_by_type() method to GraspPose - Add missing point_types field and get_points_by_type() method to InteractionPoints - Add missing velocity_limit and acceleration_limit fields to ActionCfg Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces a new embodichain.lab.sim.atomic_actions module intended to provide a unified “atomic action” abstraction (reach/grasp/move/release) on top of the existing motion-planning stack, plus a small registry/engine and accompanying design spec + unit tests for core data types.
Changes:
- Added core atomic-action data models (
Affordance,ObjectSemantics,ActionCfg) and anAtomicActionbase class. - Added default atomic actions (
ReachAction,GraspAction,MoveAction,ReleaseAction) and anAtomicActionEnginewith a global registry. - Added a design document and unit tests for core affordance/registry helpers.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
embodichain/lab/sim/atomic_actions/core.py |
Defines affordance/semantics/config + base action class utilities. |
embodichain/lab/sim/atomic_actions/actions.py |
Implements reach/grasp/move/release actions using MotionGenerator/TOPPRA. |
embodichain/lab/sim/atomic_actions/engine.py |
Adds engine orchestration + global registry + placeholder semantic analyzer. |
embodichain/lab/sim/atomic_actions/__init__.py |
Exposes public API for atomic_actions package. |
tests/sim/atomic_actions/test_core.py |
Adds unit tests for affordances, ObjectSemantics, ActionCfg, registry helpers. |
tests/sim/atomic_actions/__init__.py |
Marks test package for atomic actions. |
docs/superpowers/specs/2026-04-17-atomic-action-abstraction-design.md |
Design specification for the new abstraction layer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ): | ||
| self.motion_generator = motion_generator | ||
| self.robot = motion_generator.robot | ||
| self.device = self.robot.device |
There was a problem hiding this comment.
AtomicAction.__init__ only accepts motion_generator, but other code in this PR (e.g., action implementations) calls super().__init__(motion_generator, robot, control_part, device) and later relies on self.control_part. Either update AtomicAction.__init__ to accept/store robot, control_part, and device (and keep it consistent with the concrete actions/engine), or update all actions/engine to match the existing constructor and set control_part elsewhere.
| ): | |
| self.motion_generator = motion_generator | |
| self.robot = motion_generator.robot | |
| self.device = self.robot.device | |
| robot: Optional[Robot] = None, | |
| control_part: Optional[str] = None, | |
| device: Optional[torch.device] = None, | |
| ): | |
| self.motion_generator = motion_generator | |
| self.robot = robot if robot is not None else motion_generator.robot | |
| self.control_part = ( | |
| control_part if control_part is not None else ActionCfg.control_part | |
| ) | |
| self.device = device if device is not None else self.robot.device |
| device: torch.device = torch.device("cuda"), | ||
| interpolation_type: str = "linear", # "linear", "cubic", "toppra" | ||
| ): | ||
| super().__init__(motion_generator, robot, control_part, device) |
There was a problem hiding this comment.
ReachAction.__init__ calls super().__init__(motion_generator, robot, control_part, device), but AtomicAction.__init__ currently only takes motion_generator. This mismatch will prevent instantiation of default actions in AtomicActionEngine.
| super().__init__(motion_generator, robot, control_part, device) | |
| super().__init__(motion_generator) | |
| self.robot = robot | |
| self.control_part = control_part | |
| self.device = device |
| target_states = [ | ||
| PlanState(qpos=start_qpos, move_type=MoveType.JOINT_MOVE), | ||
| PlanState(xpos=approach_pose, move_type=MoveType.EEF_MOVE), | ||
| ] | ||
|
|
||
| # Plan trajectory | ||
| options = MotionGenOptions( | ||
| control_part=self.control_part, | ||
| is_interpolate=True, | ||
| is_linear=self.interpolation_type == "linear", | ||
| interpolate_position_step=0.002, | ||
| plan_opts=ToppraPlanOptions( | ||
| sample_interval=kwargs.get("sample_interval", 30), | ||
| ), | ||
| ) |
There was a problem hiding this comment.
ReachAction.execute builds target_states mixing MoveType.JOINT_MOVE and MoveType.EEF_MOVE while also setting MotionGenOptions(is_interpolate=True). MotionGenerator.generate only supports pre-interpolation when all states share the same move_type, so this will error (or produce invalid stacks). Use options.start_qpos for the start state and make all target_states EEF_MOVE, or disable pre-interpolation / split planning into separate stages.
| ) | ||
| success, _ = self.robot.compute_ik( | ||
| pose=target_pose.unsqueeze(0), | ||
| qpos_seed=qpos_seed.unsqueeze(0), |
There was a problem hiding this comment.
ReachAction.validate passes qpos_seed= into Robot.compute_ik, but the API uses joint_seed=. As written, this will raise a TypeError when validation is called.
| qpos_seed=qpos_seed.unsqueeze(0), | |
| joint_seed=qpos_seed.unsqueeze(0), |
| velocity_limit=velocity_limit, | ||
| acceleration_limit=acceleration_limit, |
There was a problem hiding this comment.
ToppraPlanOptions does not support velocity_limit / acceleration_limit keyword args (it uses a constraints dict instead). Passing these will raise a TypeError when constructing ToppraPlanOptions; map the limits into constraints={"velocity": ..., "acceleration": ...} or extend the planner options type if needed.
| velocity_limit=velocity_limit, | |
| acceleration_limit=acceleration_limit, | |
| constraints={ | |
| "velocity": velocity_limit, | |
| "acceleration": acceleration_limit, | |
| }, |
| ) | ||
| success, _ = self.robot.compute_ik( | ||
| pose=grasp_pose.unsqueeze(0), | ||
| qpos_seed=qpos_seed.unsqueeze(0), |
There was a problem hiding this comment.
GraspAction.validate passes qpos_seed= into Robot.compute_ik, but the API uses joint_seed=. This will raise a TypeError when validation is called.
| qpos_seed=qpos_seed.unsqueeze(0), | |
| joint_seed=qpos_seed.unsqueeze(0), |
| is_linear=is_linear, | ||
| interpolate_position_step=0.002, | ||
| plan_opts=ToppraPlanOptions( | ||
| sample_interval=kwargs.get("sample_interval", 30), |
There was a problem hiding this comment.
MoveAction.execute references kwargs.get(...) when building ToppraPlanOptions, but execute() does not accept **kwargs. This will raise NameError: name 'kwargs' is not defined. Either add **kwargs to the signature or remove the kwargs usage.
| sample_interval=kwargs.get("sample_interval", 30), | |
| sample_interval=30, |
| from .core import GraspPose, InteractionPoints | ||
|
|
||
| # Generate default grasp poses based on object type | ||
| default_poses = torch.eye(4).unsqueeze(0) | ||
| default_poses[0, 2, 3] = 0.1 # Default offset | ||
|
|
||
| grasp_affordance = GraspPose( | ||
| object_label=label, | ||
| poses=default_poses, | ||
| grasp_types=["default"], | ||
| ) | ||
|
|
||
| # Default interaction points | ||
| interaction_affordance = InteractionPoints( | ||
| object_label=label, | ||
| points=torch.zeros(1, 3), | ||
| point_types=["contact"], | ||
| ) |
There was a problem hiding this comment.
SemanticAnalyzer.analyze creates tensors (torch.eye, torch.zeros) on the default device (CPU). If the rest of the action stack operates on GPU, downstream ops like object_pose @ grasp_pose can fail with device mismatch. Consider constructing these tensors on the engine/analyzer device (or explicitly moving affordance tensors to self.device).
| from embodichain.lab.sim.atomic_actions import AtomicAction | ||
|
|
||
| class TestAction(AtomicAction): | ||
| def execute(self, target, **kwargs): | ||
| return PlanResult(success=True) | ||
|
|
There was a problem hiding this comment.
PlanResult is used in these test-only AtomicAction implementations, but it is never imported in this test module. This will raise NameError when the test class methods are executed; import PlanResult from embodichain.lab.sim.planners or reference it via the module where it is defined.
| # Get current state if not provided | ||
| if start_qpos is None: | ||
| start_qpos = self._get_current_qpos() | ||
|
|
There was a problem hiding this comment.
ReachAction.execute calls self._get_current_qpos(), but AtomicAction in core.py does not define this helper (it exists in the design doc but not in the implementation). This will raise AttributeError at runtime unless all concrete actions re-implement it; consider adding _get_current_qpos() to AtomicAction (likely using robot.get_qpos(name=self.control_part)[0]).
Co-authored-by: chenjian <chenjian@dexforce.com> Co-authored-by: yuecideng <dengyueci@qq.com> Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 11 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Resolve grasp pose | ||
| if isinstance(target, ObjectSemantics): | ||
| is_success, grasp_xpos, open_length = self._resolve_grasp_pose(target) | ||
| else: | ||
| is_success, grasp_xpos = self._resolve_pose_target( | ||
| target, action_name=self.__class__.__name__ | ||
| ) | ||
|
|
||
| # TODO: warning and fallback if no valid grasp pose found | ||
| if not is_success: | ||
| logger.log_warning( | ||
| "Failed to resolve grasp pose, using default approach pose" | ||
| ) | ||
| return False, torch.empty(0), self.joint_ids | ||
|
|
There was a problem hiding this comment.
When the target is ObjectSemantics, _resolve_grasp_pose() returns is_success as a boolean tensor (one per env). This if not is_success: check will raise due to ambiguous tensor truthiness. Convert to a scalar (e.g., is_success.all().item()) or branch per-environment and propagate failures in a structured way.
| @abstractmethod | ||
| def execute( | ||
| self, | ||
| target: Union[torch.Tensor, ObjectSemantics], | ||
| start_qpos: Optional[torch.Tensor] = None, | ||
| **kwargs, | ||
| ) -> tuple[bool, torch.Tensor, list[float]]: | ||
| """execute pick up action | ||
|
|
||
| Args: | ||
| target (ObjectSemantics): object semantics containing grasp affordance and entity information | ||
| start_qpos (Optional[torch.Tensor], optional): Planning start qpos. Defaults to None. | ||
|
|
||
| Returns: | ||
| tuple[bool, torch.Tensor, list[float]]: | ||
| is_success, | ||
| trajectory of shape (n_envs, n_waypoints, dof), | ||
| joint_ids corresponding to trajectory | ||
| """ |
There was a problem hiding this comment.
The execute() return type annotation says list[float] for joint_ids, but these IDs are used as tensor indices throughout the engine/actions and should be list[int] (or Sequence[int]). The current annotation is misleading and can hide real typing issues.
| result = pose.clone() | ||
| result[:, :3, 3] += offset |
There was a problem hiding this comment.
_apply_offset says it applies an offset "in local frame", but the implementation adds the offset directly to the translation component (world-frame offset). Either update the implementation to rotate the offset by the pose rotation (local-frame semantics) or update the docstring/parameter naming to reflect world-frame behavior.
| result = pose.clone() | |
| result[:, :3, 3] += offset | |
| if offset.shape[0] == 1 and pose.shape[0] != 1: | |
| offset = offset.expand(pose.shape[0], -1) | |
| elif offset.shape[0] != pose.shape[0]: | |
| logger.log_error("offset batch dimension must be 1 or match pose batch dimension") | |
| result = pose.clone() | |
| world_offset = torch.bmm(pose[:, :3, :3], offset.unsqueeze(-1)).squeeze(-1) | |
| result[:, :3, 3] += world_offset |
|
|
||
| def create_mug(sim: SimulationManager) -> RigidObject: | ||
| mug_cfg = RigidObjectCfg( | ||
| uid="table", |
There was a problem hiding this comment.
create_mug() sets uid="table" even though it loads a cup mesh and the rest of the test treats it as a mug. This is confusing and can cause downstream code (logging, lookups by uid/label) to be inconsistent. Use a uid consistent with the object (e.g., "mug").
| uid="table", | |
| uid="mug", |
| def __init__( | ||
| self, | ||
| motion_generator: MotionGenerator, | ||
| cfg: ActionCfg = ActionCfg(), | ||
| ): | ||
| """ | ||
| Initialize the atomic action. | ||
| Args: | ||
| motion_generator: The motion generator instance to use for planning. | ||
| cfg: Configuration for the action. | ||
| """ | ||
| self.motion_generator = motion_generator | ||
| self.cfg = cfg | ||
| self.robot = motion_generator.robot | ||
| self.control_part = cfg.control_part | ||
| self.device = self.robot.device |
There was a problem hiding this comment.
AtomicAction.init uses cfg: ActionCfg = ActionCfg() as a default argument. This creates a single shared config instance at import time, which can lead to cross-instance state leakage if the config is mutated. Prefer cfg: ActionCfg | None = None and instantiate ActionCfg() inside the constructor when None.
| __all__ = [ | ||
| # Core classes | ||
| "Affordance", | ||
| "GraspPose", |
There was a problem hiding this comment.
__all__ includes "GraspPose", but this module does not define or import a GraspPose symbol. This will make from embodichain.lab.sim.atomic_actions import GraspPose fail and can also confuse docs/autosummary generation. Either implement/export GraspPose or remove it from __all__. Also consider adding AntipodalAffordance to __all__ since it is part of the public API used by the tutorial.
| "GraspPose", | |
| "AntipodalAffordance", |
| config = SimulationManagerCfg( | ||
| headless=True, | ||
| sim_device="cuda", | ||
| physics_dt=1.0 / 100.0, | ||
| num_envs=1, |
There was a problem hiding this comment.
This test hard-requires CUDA (sim_device="cuda") and calls sim.init_gpu_physics(). In CI or dev environments without a GPU this will fail. Consider switching the test to CPU execution (as many other sim tests do) or marking it with pytest.mark.skipif(not torch.cuda.is_available(), ...) / a heavier integration-test marker so it doesn't gate unit test runs.
| super().__init__( | ||
| motion_generator, cfg=cfg if cfg is not None else PickUpActionCfg() | ||
| ) | ||
| self.cfg = cfg |
There was a problem hiding this comment.
PickUpAction.init overwrites the config set by the base class with self.cfg = cfg. When cfg is None (the common/default case), this sets self.cfg to None and the next line (self.cfg.approach_direction) will raise an AttributeError. Keep the config created in the parent (super().__init__(...)) or set self.cfg to the resolved default config object instead of the raw argument.
| self.cfg = cfg |
| super().__init__( | ||
| motion_generator, cfg=cfg if cfg is not None else PlaceActionCfg() | ||
| ) | ||
| self.cfg = cfg | ||
| if self.cfg.hand_open_qpos is None: | ||
| logger.log_error("hand_open_qpos must be specified in PlaceActionCfg") | ||
| if self.cfg.hand_close_qpos is None: | ||
| logger.log_error("hand_close_qpos must be specified in PlaceActionCfg") | ||
| self.hand_open_qpos = self.cfg.hand_open_qpos.to(self.device) | ||
| self.hand_close_qpos = self.cfg.hand_close_qpos.to(self.device) | ||
|
|
There was a problem hiding this comment.
PlaceAction.init has the same issue as PickUpAction: self.cfg = cfg can set self.cfg to None, but later code assumes it is a PlaceActionCfg instance (e.g., accessing hand_open_qpos/hand_close_qpos). Avoid overwriting self.cfg with the raw constructor argument; keep the resolved config from the base constructor or explicitly default it.
| is_success, qpos = self.robot.compute_ik( | ||
| pose=xpos_traj[:, j], name=self.cfg.control_part, joint_seed=qpos_seed | ||
| ) | ||
| if not is_success: | ||
| logger.log_warning( | ||
| f"Failed to compute IK for target state {j} in some environments. " | ||
| "The resulting trajectory may be invalid." | ||
| ) | ||
| return False, trajectory |
There was a problem hiding this comment.
robot.compute_ik() returns a boolean tensor (shape (n_envs,)), but this code uses if not is_success: which is invalid for multi-element tensors (and will raise "Boolean value of Tensor is ambiguous" even for shape (1,)). Use if not is_success.all().item(): (or handle per-env failures explicitly) before proceeding.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """Abstract base class for atomic actions. | ||
|
|
||
| All atomic actions use PlanResult from embodichain.lab.sim.planners | ||
| as the return type for execute() method, ensuring consistency with | ||
| the existing motion planning infrastructure. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| motion_generator: MotionGenerator, | ||
| cfg: ActionCfg = ActionCfg(), | ||
| ): | ||
| """ | ||
| Initialize the atomic action. | ||
| Args: | ||
| motion_generator: The motion generator instance to use for planning. | ||
| cfg: Configuration for the action. | ||
| """ | ||
| self.motion_generator = motion_generator | ||
| self.cfg = cfg | ||
| self.robot = motion_generator.robot | ||
| self.control_part = cfg.control_part | ||
| self.device = self.robot.device | ||
|
|
||
| @abstractmethod | ||
| def execute( | ||
| self, | ||
| target: Union[torch.Tensor, ObjectSemantics], | ||
| start_qpos: Optional[torch.Tensor] = None, | ||
| **kwargs, | ||
| ) -> tuple[bool, torch.Tensor, list[float]]: | ||
| """execute pick up action |
There was a problem hiding this comment.
The AtomicAction class docstring says actions “use PlanResult … as the return type for execute()”, but the abstract execute() signature actually returns a 3-tuple (bool, Tensor, joint_ids). This mismatch will confuse implementers and users of the abstraction layer. Update either the documentation or the signature so they agree (and also consider typing joint_ids as list[int] rather than list[float]).
| properties = target.get("properties") | ||
| if properties is not None: | ||
| semantics.properties.update(properties) | ||
|
|
||
| uid = target.get("uid") | ||
| if uid is not None: | ||
| semantics.uid = uid | ||
|
|
There was a problem hiding this comment.
_resolve_target mutates and extends the returned ObjectSemantics instance (semantics.properties.update(...), semantics.uid = ...). If the semantic analyzer returns a cached object (the default path), this will leak per-call state into the cache and affect later resolutions. Also, ObjectSemantics doesn’t define a uid field, so this becomes an ad-hoc attribute. Consider copying the semantics before applying overrides and/or adding an explicit uid: str | None field to ObjectSemantics.
| target_pose: Target pose [4, 4] | ||
| qpos_seed: Seed configuration [DOF] | ||
|
|
||
| Returns: | ||
| Joint configuration [DOF] | ||
|
|
||
| Raises: | ||
| RuntimeError: If IK fails to find a solution | ||
| """ | ||
| if qpos_seed is None: | ||
| qpos_seed = self.robot.get_qpos() | ||
|
|
||
| success, qpos = self.robot.compute_ik( | ||
| pose=target_pose.unsqueeze(0), | ||
| qpos_seed=qpos_seed.unsqueeze(0), | ||
| name=self.control_part, | ||
| ) | ||
|
|
||
| if not success.all(): | ||
| raise RuntimeError(f"IK failed for target pose: {target_pose}") | ||
|
|
||
| return qpos.squeeze(0) |
There was a problem hiding this comment.
AtomicAction._ik_solve calls Robot.compute_ik with a non-existent keyword argument qpos_seed and also assumes qpos_seed is 1D (it defaults to robot.get_qpos(), which is typically batched). This will raise at runtime and/or produce an incorrectly-shaped seed. Use the Robot API’s joint_seed parameter and normalize qpos_seed to the expected shape before calling compute_ik (e.g., accept (dof,) or (n_envs,dof) and pass through without adding extra batch dims).
| target_pose: Target pose [4, 4] | |
| qpos_seed: Seed configuration [DOF] | |
| Returns: | |
| Joint configuration [DOF] | |
| Raises: | |
| RuntimeError: If IK fails to find a solution | |
| """ | |
| if qpos_seed is None: | |
| qpos_seed = self.robot.get_qpos() | |
| success, qpos = self.robot.compute_ik( | |
| pose=target_pose.unsqueeze(0), | |
| qpos_seed=qpos_seed.unsqueeze(0), | |
| name=self.control_part, | |
| ) | |
| if not success.all(): | |
| raise RuntimeError(f"IK failed for target pose: {target_pose}") | |
| return qpos.squeeze(0) | |
| target_pose: Target pose [4, 4] or [B, 4, 4] | |
| qpos_seed: Seed configuration [DOF] or [B, DOF] | |
| Returns: | |
| Joint configuration [DOF] or [B, DOF] | |
| Raises: | |
| RuntimeError: If IK fails to find a solution | |
| """ | |
| single_pose = target_pose.dim() == 2 | |
| if single_pose: | |
| pose = target_pose.unsqueeze(0) | |
| elif target_pose.dim() == 3 and target_pose.shape[1:] == (4, 4): | |
| pose = target_pose | |
| else: | |
| raise RuntimeError( | |
| f"target_pose must have shape [4, 4] or [B, 4, 4], got {tuple(target_pose.shape)}" | |
| ) | |
| if qpos_seed is None: | |
| qpos_seed = self.robot.get_qpos() | |
| if qpos_seed.dim() == 1: | |
| joint_seed = qpos_seed.unsqueeze(0) | |
| elif qpos_seed.dim() == 2: | |
| joint_seed = qpos_seed | |
| else: | |
| raise RuntimeError( | |
| f"qpos_seed must have shape [DOF] or [B, DOF], got {tuple(qpos_seed.shape)}" | |
| ) | |
| if joint_seed.shape[0] != pose.shape[0]: | |
| if joint_seed.shape[0] == 1: | |
| joint_seed = joint_seed.expand(pose.shape[0], -1) | |
| else: | |
| raise RuntimeError( | |
| "Batch size mismatch between target_pose and qpos_seed: " | |
| f"{pose.shape[0]} != {joint_seed.shape[0]}" | |
| ) | |
| success, qpos = self.robot.compute_ik( | |
| pose=pose, | |
| joint_seed=joint_seed, | |
| name=self.control_part, | |
| ) | |
| if not success.all(): | |
| raise RuntimeError(f"IK failed for target pose: {target_pose}") | |
| return qpos.squeeze(0) if single_pose else qpos |
| def _draw_grasp_xpos(self, grasp_xpos: torch.Tensor, open_length: torch.Tensor): | ||
| sim = SimulationManager.get_instance() | ||
| axis_xpos = [] | ||
| for i in range(grasp_xpos.shape[0]): | ||
| axis_xpos.append(grasp_xpos[i].to("cpu").numpy()) | ||
| sim.draw_marker( | ||
| cfg=MarkerCfg( | ||
| name="grasp_xpos", |
There was a problem hiding this comment.
AntipodalAffordance._draw_grasp_xpos references SimulationManager and MarkerCfg but neither is imported in this module, so enabling is_draw_grasp_xpos will raise a NameError. Import these (preferably inside the method to avoid circular deps, consistent with other sim modules) before calling SimulationManager.get_instance() / MarkerCfg(...).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not is_success: | ||
| logger.log_warning( | ||
| f"Failed to compute IK for target state {j} in some environments. " | ||
| "The resulting trajectory may be invalid." |
There was a problem hiding this comment.
robot.compute_ik returns a boolean tensor (shape (n_envs,)). The check if not is_success: will raise "Boolean value of Tensor is ambiguous" at runtime. Use an aggregate check like if not is_success.all(): (and decide whether partial-env failure should be allowed) and handle the per-env failures accordingly.
| if not is_success: | |
| logger.log_warning( | |
| f"Failed to compute IK for target state {j} in some environments. " | |
| "The resulting trajectory may be invalid." | |
| if not bool(is_success.all().item()): | |
| failed_env_ids = (~is_success).nonzero(as_tuple=False).flatten().tolist() | |
| logger.log_warning( | |
| f"Failed to compute IK for target state {j} in environments " | |
| f"{failed_env_ids}. The resulting trajectory may be invalid." |
| approach_direction: torch.Tensor = torch.tensor( | ||
| [0, 0, -1], dtype=torch.float32 | ||
| ), | ||
| ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: | ||
| if self.generator is None: | ||
| self._init_generator() | ||
|
|
There was a problem hiding this comment.
get_best_grasp_poses has a default approach_direction=torch.tensor(...) created on CPU at import time. If the grasp generator runs on CUDA, passing this default will cause a device mismatch inside GraspGenerator computations. Consider making the default None and creating/moving the tensor to obj_poses.device (or self.generator.device) inside the method.
| approach_direction: torch.Tensor = torch.tensor( | |
| [0, 0, -1], dtype=torch.float32 | |
| ), | |
| ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: | |
| if self.generator is None: | |
| self._init_generator() | |
| approach_direction: Optional[torch.Tensor] = None, | |
| ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: | |
| if self.generator is None: | |
| self._init_generator() | |
| if approach_direction is None: | |
| approach_direction = torch.tensor( | |
| [0, 0, -1], dtype=torch.float32, device=obj_poses.device | |
| ) | |
| else: | |
| approach_direction = approach_direction.to(obj_poses.device) |
| __all__ = [ | ||
| # Core classes | ||
| "Affordance", | ||
| "GraspPose", | ||
| "InteractionPoints", | ||
| "ObjectSemantics", | ||
| "ActionCfg", | ||
| "AtomicAction", | ||
| # Action implementations |
There was a problem hiding this comment.
__all__ includes "GraspPose", but there is no GraspPose symbol defined or imported in this package. This will break from embodichain.lab.sim.atomic_actions import * and can confuse doc generation. Remove it or export the correct symbol (e.g., AntipodalAffordance if that’s what was intended).
| All atomic actions use PlanResult from embodichain.lab.sim.planners | ||
| as the return type for execute() method, ensuring consistency with | ||
| the existing motion planning infrastructure. |
There was a problem hiding this comment.
The AtomicAction class docstring states that actions use PlanResult as the return type for execute(), but the abstract method is actually defined to return a raw tuple. Please update the docstring (or the method signature) so the documented return type matches the real API.
| All atomic actions use PlanResult from embodichain.lab.sim.planners | |
| as the return type for execute() method, ensuring consistency with | |
| the existing motion planning infrastructure. | |
| Atomic actions expose an ``execute()`` method that returns a tuple of | |
| ``(is_success, trajectory, joint_ids)``, where ``is_success`` indicates | |
| whether planning succeeded, ``trajectory`` is the planned joint-space | |
| trajectory tensor, and ``joint_ids`` identifies the joints corresponding | |
| to that trajectory. |
| if len(target_list) != len(action_names): | ||
| logger.log_error( | ||
| f"Length of target_list ({len(target_list)}) must match number of actions ({len(action_names)})." | ||
| ) | ||
| start_qpos = self.motion_generator.robot.get_qpos() | ||
| n_envs = start_qpos.shape[0] | ||
| all_dof = self.motion_generator.robot.dof |
There was a problem hiding this comment.
execute_static logs an error when len(target_list) doesn’t match the number of configured actions, but it still proceeds. Because zip(...) truncates, this can silently skip actions or ignore extra targets while still returning success. This should return early (False / raise) after logging so callers don’t get a partially planned trajectory.
| if len(target_list) != len(action_names): | |
| logger.log_error( | |
| f"Length of target_list ({len(target_list)}) must match number of actions ({len(action_names)})." | |
| ) | |
| start_qpos = self.motion_generator.robot.get_qpos() | |
| n_envs = start_qpos.shape[0] | |
| all_dof = self.motion_generator.robot.dof | |
| start_qpos = self.motion_generator.robot.get_qpos() | |
| n_envs = start_qpos.shape[0] | |
| all_dof = self.motion_generator.robot.dof | |
| if len(target_list) != len(action_names): | |
| logger.log_error( | |
| f"Length of target_list ({len(target_list)}) must match number of actions ({len(action_names)})." | |
| ) | |
| return ( | |
| False, | |
| torch.empty( | |
| size=(n_envs, 0, all_dof), | |
| dtype=torch.float32, | |
| device=self.device, | |
| ), | |
| ) |
| def _init_actions( | ||
| self, actions_cfg_list: Optional[List[ActionCfg]] = None | ||
| ) -> Dict[str, "AtomicAction"]: | ||
| actions: Dict[str, AtomicAction] = {} | ||
| from .actions import MoveAction, PickUpAction, PlaceAction | ||
|
|
||
| builtin_action_map: Dict[str, Type[AtomicAction]] = { | ||
| "move": MoveAction, | ||
| "pick_up": PickUpAction, | ||
| "place": PlaceAction, | ||
| } | ||
| if actions_cfg_list is not None: | ||
| for cfg in actions_cfg_list: | ||
| action_class = builtin_action_map.get( | ||
| cfg.name | ||
| ) or _global_action_registry.get(cfg.name) | ||
| if action_class is None: | ||
| logger.log_error(f"Unknown action name in config: {cfg.name}") | ||
| continue | ||
| instance = action_class(motion_generator=self.motion_generator, cfg=cfg) | ||
| actions[cfg.name] = instance | ||
| return actions |
There was a problem hiding this comment.
_init_actions stores actions in a dict keyed by cfg.name. This drops duplicates and can reorder/overwrite actions if actions_cfg_list contains repeated names (e.g., two move actions), which conflicts with the documented positional sequencing behavior. Consider storing actions as an ordered list (or list of (name, instance) pairs) so duplicates and order are preserved.
| semantics = self._semantic_analyzer.analyze( | ||
| label=label, | ||
| geometry=geometry, | ||
| custom_config=custom_config, | ||
| use_cache=use_cache, | ||
| ) | ||
|
|
||
| properties = target.get("properties") | ||
| if properties is not None: | ||
| semantics.properties.update(properties) | ||
|
|
||
| uid = target.get("uid") | ||
| if uid is not None: | ||
| semantics.uid = uid | ||
|
|
||
| return semantics |
There was a problem hiding this comment.
In _resolve_target, when the input is a dict you mutate the returned ObjectSemantics (semantics.properties.update(...) and semantics.uid = uid). If the analyzer returned a cached semantics instance, these mutations will persist across future calls for the same label. Also, ObjectSemantics doesn’t define a uid field, so this adds an ad-hoc attribute. Prefer returning a new semantics object (or a copy) whenever overrides like properties/uid are provided, and add an explicit uid field to ObjectSemantics if it’s part of the API.
| def create_robot(sim: SimulationManager, position=[0.0, 0.0, 0.0]): | ||
| """ | ||
| Create and configure a robot with an arm and a dexterous hand in the simulation. |
There was a problem hiding this comment.
create_robot uses a mutable default argument (position=[0.0, 0.0, 0.0]). If the list is ever mutated, the default will be shared across calls. Use position: Sequence[float] | None = None and assign a new list inside the function.
| def execute( | ||
| self, | ||
| target: Union[torch.Tensor, ObjectSemantics], | ||
| start_qpos: Optional[torch.Tensor] = None, | ||
| **kwargs, | ||
| ) -> tuple[bool, torch.Tensor, list[float]]: | ||
| """execute pick up action |
There was a problem hiding this comment.
execute() is annotated to return tuple[bool, torch.Tensor, list[float]], but the third item is used as joint_ids (indices) throughout the engine/actions. This should be a list of ints (or a sequence of ints) to match usage and avoid type confusion for custom action implementers.
Description
This PR introduces an atomic action abstraction layer for embodied AI motion generation. The implementation provides a unified interface for atomic actions like reach, grasp, move, etc., with support for semantic object understanding and extensible custom action registration.
Key Components
Core Classes (
core.py):Affordance- Base class for affordance data (GraspPose, InteractionPoints)ObjectSemantics- Semantic information about interaction targetsActionCfg- Configuration class for atomic actionsAtomicAction- Abstract base class for all atomic actionsAction Implementations (
actions.py):ReachAction- Reach to target pose or objectGraspAction- Execute grasp motionReleaseAction- Release graspMoveAction- Move to target poseAction Engine (
engine.py):AtomicActionEngine- Execution engine for atomic actionsDesign Principles
MotionGenerator,PlanResult, and IK/FK solversDesign Document
See
docs/superpowers/specs/2026-04-17-atomic-action-abstraction-design.mdfor detailed design specification.Type of change
Testing
Unit tests provided in
tests/sim/atomic_actions/test_core.pycovering:All 12 tests passing.
Checklist
black .command to format the code base.🤖 Generated with Claude Code