Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 53 additions & 20 deletions ncore/impl/common/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,21 +529,33 @@ def from_sensor_rig(
T_sensor_rig: np.ndarray,
T_rig_worlds: np.ndarray,
T_rig_worlds_timestamps_us: np.ndarray,
anchor_frame_id: str = "world",
rig_frame_id: str = "rig",
) -> MotionCompensator:
"""Constructs a motion-compensator from a sensor-rig transformations for a specific sensor only"""
"""Constructs a motion-compensator from a sensor-rig transformations for a specific sensor only

Args:
sensor_id (str): id of the sensor node
T_sensor_rig (np.ndarray): static sensor -> rig transform, [4, 4]
T_rig_worlds (np.ndarray): time-dependent rig -> anchor transforms, [N, 4, 4]
T_rig_worlds_timestamps_us (np.ndarray): timestamps for the rig -> anchor transforms, [N]
anchor_frame_id (str): name of the common anchor frame node (defaults to "world").
Pass the same value as ``anchor_frame_id`` to the (de)compensation methods.
rig_frame_id (str): name of the rig frame node (defaults to "rig").
"""

return MotionCompensator(
PoseGraphInterpolator(
[
PoseGraphInterpolator.Edge(
source_node="rig",
target_node="world",
source_node=rig_frame_id,
target_node=anchor_frame_id,
T_source_target=T_rig_worlds,
timestamps_us=T_rig_worlds_timestamps_us,
),
PoseGraphInterpolator.Edge(
source_node=sensor_id,
target_node="rig",
target_node=rig_frame_id,
T_source_target=T_sensor_rig,
timestamps_us=None,
),
Expand All @@ -570,8 +582,12 @@ class MotionCompensationResult:
xyz_s_reftime: np.ndarray
# motion-compensated ray segment end points, relative to the sensor frame at the reference time, [N,3]
xyz_e_reftime: np.ndarray
# timestamp of the sensor reference frame the points are expressed in
# reference sensor frame the compensated points are expressed in (equals the sensor_id)
reference_frame_id: str
# timestamp of the reference sensor frame the points are expressed in
reference_timestamp_us: int
# common anchor frame used to compose the compensation transforms
anchor_frame_id: str

def motion_compensate_points(
self,
Expand All @@ -581,6 +597,7 @@ def motion_compensate_points(
frame_start_timestamp_us: int,
frame_end_timestamp_us: int,
reference_timestamp_us: Optional[int] = None,
anchor_frame_id: str = "world",
) -> MotionCompensationResult:
"""
Perform motion compensation of points in time-dependent sensor frame at measurement time to a common reference sensor frame
Expand All @@ -589,17 +606,21 @@ def motion_compensate_points(
Pass ``reference_timestamp_us`` to compensate to a different reference frame
(e.g. the start of the frame). The chosen reference
must be inverted by :meth:`motion_decompensate_points` using the same
``reference_timestamp_us``.
``reference_timestamp_us`` and ``anchor_frame_id``.

Args:
sensor_id (str): sensor the points are relative to
sensor_id (str): sensor the points are relative to. This is also the reference
frame the compensated points are expressed in (``reference_frame_id``).
xyz_pointtime(np.ndarray): points in time-dependent sensor frame (~before motion compensation), [N,3]
timestamp_us(np.ndarray): timestamps of points, [N]
frame_start_timestamp_us(int): frame start timestamp
frame_end_timestamp_us(int): frame end timestamp
reference_timestamp_us(Optional[int]): timestamp of the reference sensor frame to
compensate to; defaults to ``frame_end_timestamp_us``. Must lie within
``[frame_start_timestamp_us, frame_end_timestamp_us]``.
anchor_frame_id(str): common anchor frame used to compose the transforms
(defaults to "world"). Use a non-default anchor when the pose graph is
anchored at a frame other than "world".
Returns:
MotionCompensationResult: result of the motion-compensation
"""
Expand All @@ -614,7 +635,9 @@ def motion_compensate_points(
return self.MotionCompensationResult(
np.empty_like(xyz_pointtime, shape=(0, 3)),
np.empty_like(xyz_pointtime, shape=(0, 3)),
reference_timestamp_us,
reference_frame_id=sensor_id,
reference_timestamp_us=reference_timestamp_us,
anchor_frame_id=anchor_frame_id,
)

assert frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us, (
Expand All @@ -625,18 +648,18 @@ def motion_compensate_points(
)

# Interpolate egomotion at the reference timestamp for the reference sensor pose
T_world_sensor_reftime = self._pose_graph.evaluate_poses(
"world", sensor_id, np.array(reference_timestamp_us, dtype=np.uint64)
T_anchor_sensor_reftime = self._pose_graph.evaluate_poses(
anchor_frame_id, sensor_id, np.array(reference_timestamp_us, dtype=np.uint64)
)

# Determine unique timestamps to only perform actually required pose interpolations (a lot of points share the same timestamp)
timestamp_unique, unique_timestamp_reverse_idxs = np.unique(timestamp_us, return_inverse=True)

# Per-point transform mapping the point-time sensor frame to the reference-time
# sensor frame:
# T_sensor_pointtime_sensor_reftime = T_world_sensor_reftime @ T_sensor_pointtime_world
# T_sensor_pointtime_sensor_reftime = T_anchor_sensor_reftime @ T_sensor_pointtime_anchor
T_sensor_pointtime_sensor_reftime_unique = (
T_world_sensor_reftime @ self._pose_graph.evaluate_poses(sensor_id, "world", timestamp_unique)
T_anchor_sensor_reftime @ self._pose_graph.evaluate_poses(sensor_id, anchor_frame_id, timestamp_unique)
).astype(np.float32)

# Pick sensor positions (in the reference-time pose) for each start point (blow up to original potentially non-unique timestamp range)
Expand All @@ -647,7 +670,13 @@ def motion_compensate_points(
xyz_pointtime[:, np.newaxis, :], T_sensor_pointtime_sensor_reftime_unique[unique_timestamp_reverse_idxs]
).squeeze(1) # N x 3

return self.MotionCompensationResult(xyz_s_reftime, xyz_e_reftime, reference_timestamp_us)
return self.MotionCompensationResult(
xyz_s_reftime,
xyz_e_reftime,
reference_frame_id=sensor_id,
reference_timestamp_us=reference_timestamp_us,
anchor_frame_id=anchor_frame_id,
)

def motion_decompensate_points(
self,
Expand All @@ -657,6 +686,7 @@ def motion_decompensate_points(
frame_start_timestamp_us: int,
frame_end_timestamp_us: int,
reference_timestamp_us: Optional[int] = None,
anchor_frame_id: str = "world",
) -> np.ndarray:
"""
Decompensate motion of motion-compensated points to bring points into time-dependent sensor-frame
Expand All @@ -666,7 +696,8 @@ def motion_decompensate_points(
default this reference is the end of the frame (``frame_end_timestamp_us``),
matching :meth:`motion_compensate_points`. Datasets that compensate to a
different reference (e.g. the start of the frame) can pass
``reference_timestamp_us`` explicitly.
``reference_timestamp_us`` explicitly. ``anchor_frame_id`` must match the value
passed to :meth:`motion_compensate_points`.

Args:
sensor_id (str): sensor the points are relative to
Expand All @@ -677,6 +708,8 @@ def motion_decompensate_points(
reference_timestamp_us(Optional[int]): timestamp of the sensor frame the points are
expressed in; defaults to ``frame_end_timestamp_us``. Must lie within
``[frame_start_timestamp_us, frame_end_timestamp_us]``.
anchor_frame_id(str): common anchor frame used to compose the transforms
(defaults to "world"). Must match the anchor used for compensation.
Returns:
xyz_pointtime(np.array): points in time-dependent sensor frame after motion-decompensation [n,3]
"""
Expand All @@ -701,18 +734,18 @@ def motion_decompensate_points(
# point's own sensor frame. This is the exact inverse of
# :meth:`motion_compensate_points`: per point we build the transform from the
# reference-time sensor frame to the point-time sensor frame:
# T_sensor_reftime_sensor_pointtime = T_world_sensor_pointtime @ T_sensor_reftime_world
# T_sensor_reftime_sensor_pointtime = T_anchor_sensor_pointtime @ T_sensor_reftime_anchor
# A point at the reference timestamp maps through the identity, as expected.
T_sensor_reftime_world = self._pose_graph.evaluate_poses(
sensor_id, "world", np.array(reference_timestamp_us, dtype=np.uint64)
T_sensor_reftime_anchor = self._pose_graph.evaluate_poses(
sensor_id, anchor_frame_id, np.array(reference_timestamp_us, dtype=np.uint64)
) # [4, 4]

# Determine unique timestamps to only perform actually required pose interpolations (a lot of points share the same timestamp)
timestamp_unique, unique_timestamp_reverse_idxs = np.unique(timestamp_us, return_inverse=True)

# evaluate_poses(world, sensor) yields T_world_sensor_pointtime (the inverse of T_sensor_pointtime_world).
T_world_sensor_pointtime_unique = self._pose_graph.evaluate_poses("world", sensor_id, timestamp_unique)
T_sensor_reftime_sensor_pointtime_unique = (T_world_sensor_pointtime_unique @ T_sensor_reftime_world).astype(
# evaluate_poses(anchor, sensor) yields T_anchor_sensor_pointtime (the inverse of T_sensor_pointtime_anchor).
T_anchor_sensor_pointtime_unique = self._pose_graph.evaluate_poses(anchor_frame_id, sensor_id, timestamp_unique)
T_sensor_reftime_sensor_pointtime_unique = (T_anchor_sensor_pointtime_unique @ T_sensor_reftime_anchor).astype(
np.float32
)

Expand Down
103 changes: 103 additions & 0 deletions ncore/impl/common/transformations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,109 @@ def test_default_reference_matches_explicit_end_of_frame(self):
)
np.testing.assert_array_equal(default, explicit)

def test_result_exposes_frame_ids(self):
"""The result records the reference (sensor) frame and the anchor frame."""
motion_compensator = MotionCompensator(self.loader.pose_graph)
lidar_sensor = self.loader.get_lidar_sensor("lidar_gt_top_p128_v4p5")

xyz_m_ref = lidar_sensor.get_frame_point_cloud(
0, motion_compensation=False, with_start_points=False, return_index=0
).xyz_m_end
timestamp_us = lidar_sensor.get_frame_ray_bundle_timestamp_us(0)
frame_start_us = lidar_sensor.get_frame_timestamp_us(0, types.FrameTimepoint.START)
frame_end_us = lidar_sensor.get_frame_timestamp_us(0, types.FrameTimepoint.END)

result = motion_compensator.motion_compensate_points(
lidar_sensor.sensor_id, xyz_m_ref, timestamp_us, frame_start_us, frame_end_us
)
self.assertEqual(result.reference_frame_id, lidar_sensor.sensor_id)
self.assertEqual(result.anchor_frame_id, "world")
self.assertEqual(result.reference_timestamp_us, frame_end_us)

# The empty-input path carries the same frame metadata.
empty = motion_compensator.motion_compensate_points(
lidar_sensor.sensor_id,
np.empty((0, 3), dtype=np.float32),
np.empty((0,), dtype=np.uint64),
frame_start_us,
frame_end_us,
)
self.assertEqual(empty.reference_frame_id, lidar_sensor.sensor_id)
self.assertEqual(empty.anchor_frame_id, "world")


class TestMotionCompensatorAnchorFrame(unittest.TestCase):
"""Anchor-frame parameterization on a small synthetic pose graph (no external data)."""

def _build_graph(self) -> PoseGraphInterpolator:
# sensor -(static)-> rig -(dynamic)-> world -(static)-> world_global.
# "world" and "world_global" differ only by a static transform, so either is a
# valid anchor that preserves the within-frame motion; "rig" is NOT (the
# sensor is static relative to it, so it would see no motion).
timestamps_us = np.array([1000, 2000], dtype=np.uint64)
T_rig_world = np.stack([get_SE3(np.array([0.0, 0.0, 0.0])), get_SE3(np.array([1.0, 2.0, 3.0]))])
T_sensor_rig = get_SE3(np.array([0.5, -0.5, 0.25]))
T_world_world_global = get_SE3(np.array([10.0, -20.0, 5.0]))
return PoseGraphInterpolator(
[
PoseGraphInterpolator.Edge("rig", "world", T_rig_world, timestamps_us),
PoseGraphInterpolator.Edge("lidar", "rig", T_sensor_rig, None),
PoseGraphInterpolator.Edge("world", "world_global", T_world_world_global, None),
]
)

def test_compensation_independent_of_static_anchor_choice(self):
"""Anchors related by a static transform (world / world_global) give identical results."""
compensator = MotionCompensator(self._build_graph())

rng = np.random.default_rng(0)
xyz = rng.normal(size=(64, 3)).astype(np.float32)
timestamp_us = np.linspace(1000, 2000, 64).astype(np.uint64)

via_world = compensator.motion_compensate_points(
"lidar", xyz, timestamp_us, 1000, 2000, anchor_frame_id="world"
)
via_global = compensator.motion_compensate_points(
"lidar", xyz, timestamp_us, 1000, 2000, anchor_frame_id="world_global"
)

np.testing.assert_array_almost_equal(via_world.xyz_e_reftime, via_global.xyz_e_reftime, decimal=4)
self.assertEqual(via_world.anchor_frame_id, "world")
self.assertEqual(via_global.anchor_frame_id, "world_global")

# Round-trip with a non-default anchor recovers the input.
roundtrip = compensator.motion_decompensate_points(
"lidar", via_global.xyz_e_reftime, timestamp_us, 1000, 2000, anchor_frame_id="world_global"
)
np.testing.assert_array_almost_equal(roundtrip, xyz, decimal=4)

def test_from_sensor_rig_custom_frame_names(self):
"""from_sensor_rig honors custom anchor/rig frame node names."""
timestamps_us = np.array([1000, 2000], dtype=np.uint64)
T_rig_anchor = np.stack([get_SE3(np.array([0.0, 0.0, 0.0])), get_SE3(np.array([2.0, 0.0, -1.0]))])
T_sensor_rig = get_SE3(np.array([0.1, 0.2, 0.3]))

compensator = MotionCompensator.from_sensor_rig(
sensor_id="lidar",
T_sensor_rig=T_sensor_rig,
T_rig_worlds=T_rig_anchor,
T_rig_worlds_timestamps_us=timestamps_us,
anchor_frame_id="ego_world",
rig_frame_id="chassis",
)

rng = np.random.default_rng(1)
xyz = rng.normal(size=(32, 3)).astype(np.float32)
timestamp_us = np.linspace(1000, 2000, 32).astype(np.uint64)

compensated = compensator.motion_compensate_points(
"lidar", xyz, timestamp_us, 1000, 2000, anchor_frame_id="ego_world"
)
roundtrip = compensator.motion_decompensate_points(
"lidar", compensated.xyz_e_reftime, timestamp_us, 1000, 2000, anchor_frame_id="ego_world"
)
np.testing.assert_array_almost_equal(roundtrip, xyz, decimal=4)


def get_SE3(t: np.ndarray) -> np.ndarray:
"""SE3 matrix with variable translation part"""
Expand Down
Loading