Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ncore/impl/data/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,16 @@ def model_parameters(self) -> Optional[ConcreteLidarModelParametersUnion]:
"""Returns parameters specific to the lidar's intrinsic model (optional as not mandatory)"""
...

def get_frame_ray_bundle_model_element(self, frame_index: int) -> Optional[npt.NDArray[np.uint16]]:
def get_frame_ray_bundle_model_element(self, frame_index: int) -> Optional[npt.NDArray[Any]]:
"""Returns the per-ray model elements for a ray bundle for a specific frame, if available.

Args:
frame_index: Index of the frame
Returns:
Array of per-ray model elements [N,] or None if not available
Array of per-ray model element indices / sampling coordinates with a
leading per-ray dimension N and any trailing shape ((N,), (N, K),
(N, A, B, ...)); integer dtype or floating dtype up to float32. None
if not available.
"""
...

Expand Down
2 changes: 1 addition & 1 deletion ncore/impl/data/v4/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def model_parameters(self) -> Optional[ConcreteLidarModelParametersUnion]:
return self._model_parameters

@override
def get_frame_ray_bundle_model_element(self, frame_index: int) -> Optional[npt.NDArray[np.uint16]]:
def get_frame_ray_bundle_model_element(self, frame_index: int) -> Optional[npt.NDArray[Any]]:
"""Returns the per-ray model elements for a ray bundle for a specific frame, if available"""

if self.lidar_reader.has_frame_ray_bundle_data(
Expand Down
17 changes: 13 additions & 4 deletions ncore/impl/data/v4/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,8 +1497,8 @@ def store_frame(
], # per-ray unit-norm direction vectors in sensor frame at measure time (raw / not motion-compensated, needs to have unit norm) (float32, [N, 3])
timestamp_us: npt.NDArray[np.uint64], # per-ray timestamp in microseconds (uint64, [N])
model_element: Optional[
npt.NDArray[np.uint16]
], # per-ray model element indices, if applicable (uint16, [N, 2])
npt.NDArray[Any]
], # per-ray model element indices / sampling coordinates, if applicable. Shape is (N, ...): a leading per-ray dimension N followed by any number of trailing dimensions (none -> (N,), or (N, A, B, ...)). dtype may be any integer type (discrete model index) or any floating type up to float32 (continuous model sampling coordinate, e.g. float16 / float32); float64 (and wider) is disallowed as too storage-heavy. The shape and dtype are self-describing in storage and interpreted by the associated lidar model (e.g. (N, 2) (row, col) for the current structured spinning model; extra dimensions may encode zone, scan-direction / frame parity for solid-state models).
# per-ray return data for R returns - non-existing values are indicated via NaNs
distance_m: npt.NDArray[
np.float32
Expand Down Expand Up @@ -1537,8 +1537,17 @@ def store_frame(
ray_data["timestamp_us"] = (timestamp_us, timestamp_us.shape)

if model_element is not None:
assert model_element.shape == (n_rays, 2)
assert model_element.dtype == np.dtype("uint16")
assert model_element.ndim >= 1, "model_element must have a leading per-ray dimension"
assert model_element.shape[0] == n_rays, (
"model_element must have one entry per ray (leading dimension == n_rays)"
)
# Allow any integer dtype (discrete model index) or any floating
# type up to float32 (continuous model sampling coordinate, e.g.
# float16 / float32). float64 (and wider) is disallowed as too
# storage-heavy.
assert np.issubdtype(model_element.dtype, np.integer) or (
np.issubdtype(model_element.dtype, np.floating) and model_element.dtype.itemsize <= 4
), f"model_element must be an integer type or a float type up to float32, got {model_element.dtype}"
ray_data["model_element"] = (model_element, model_element.shape)

## Per return data
Expand Down
167 changes: 166 additions & 1 deletion ncore/impl/data/v4/components_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def normalize_points(vectors: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
lidar_writer.store_frame(
ref_lidar_direction0,
ref_lidar_timestamp_us0 := np.linspace(0 * 1e6, 0.5 * 1e6, num=5, dtype=np.uint64),
ref_lidar_model_element0 := np.arange(5 * 2, dtype=np.uint16).reshape((5, 2)),
ref_lidar_model_element0 := np.arange(5 * 3, dtype=np.uint16).reshape((5, 3)),
ref_lidar_distance_m0 := lidar_distance_m0[np.newaxis, :],
ref_lidar_intensity0 := np.random.default_rng().random((1, 5)).astype(np.float32),
ref_lidar_timestamps_us0 := np.array([0 * 1e6, 0.5 * 1e6], dtype=np.uint64),
Expand Down Expand Up @@ -3175,3 +3175,168 @@ def test_per_frame_generic_data_zero_dim(self) -> None:
self.assertEqual(gd.shape, (0, 3))

tmpdir.cleanup()


@parameterized_class(
("store_type"),
[
("itar",),
("directory",),
],
)
class TestLidarModelElementTupleDim(unittest.TestCase):
"""Tests that per-ray model_element supports flexible shapes and dtypes.

model_element keeps a leading per-ray dimension N but allows any trailing
shape: flat (N,), the legacy (N, 2) (row, col), or arbitrary (N, A, B, ...).
The dtype may be any integer type (discrete model index) or a floating type
up to float32 (continuous model sampling coordinate, e.g. float16 / float32);
float64 (and wider) is rejected as too storage-heavy. Shape and dtype are
self-describing in storage, so this is a backwards-compatible extension of
the previously fixed (N, 2) uint16 layout.
"""

store_type: Literal["itar", "directory"]

def setUp(self) -> None:
np.set_printoptions(floatmode="unique", linewidth=200, suppress=True)

def _make_lidar_frame(self, n_rays: int) -> Dict[str, np.ndarray]:
rng = np.random.default_rng(7)
raw_pts = rng.random((n_rays, 3)).astype(np.float32) + 0.1
norms = np.linalg.norm(raw_pts, axis=1)
return {
"direction": (raw_pts / norms[:, np.newaxis]).astype(np.float32),
"distance_m": norms[np.newaxis, :].astype(np.float32),
"intensity": rng.random((1, n_rays)).astype(np.float32),
"timestamp_us": np.linspace(0, 100_000, num=n_rays, dtype=np.uint64),
"frame_timestamps_us": np.array([0, 100_000], dtype=np.uint64),
}

def _make_writer(
self,
) -> Tuple[LidarSensorComponent.Writer, SequenceComponentGroupsWriter, tempfile.TemporaryDirectory]:
tmpdir = tempfile.TemporaryDirectory()
store_writer = SequenceComponentGroupsWriter(
output_dir_path=UPath(tmpdir.name),
store_base_name=(seq_id := "lidar-model-element-tuple-dim"),
sequence_id=seq_id,
sequence_timestamp_interval_us=HalfClosedInterval(0, 10_000_001),
store_type=self.store_type,
generic_meta_data={},
)
lidar_writer = store_writer.register_component_writer(LidarSensorComponent.Writer, "lidar_me", "lidars")
return lidar_writer, store_writer, tmpdir

def _roundtrip(self, model_element: np.ndarray) -> np.ndarray:
"""Store a single frame with the given model_element and read it back."""
lidar_writer, store_writer, tmpdir = self._make_writer()
n_rays = model_element.shape[0]
frame = self._make_lidar_frame(n_rays)
lidar_writer.store_frame(
direction=frame["direction"],
timestamp_us=frame["timestamp_us"],
model_element=model_element,
distance_m=frame["distance_m"],
intensity=frame["intensity"],
frame_timestamps_us=frame["frame_timestamps_us"],
generic_data={},
generic_meta_data={},
)
store_paths = store_writer.finalize()
reader = SequenceComponentGroupsReader(component_group_paths=store_paths)
lidar_reader = reader.open_component_readers(LidarSensorComponent.Reader)["lidar_me"]
frame_ts = int(lidar_reader.frames_timestamps_us[0, 1])
read_back = lidar_reader.get_frame_ray_bundle_data(frame_ts, "model_element")
tmpdir.cleanup()
return read_back

def _assert_store_rejected(self, model_element: np.ndarray) -> None:
lidar_writer, _store_writer, tmpdir = self._make_writer()
n_rays = model_element.shape[0] if model_element.ndim >= 1 else 0
frame = self._make_lidar_frame(max(n_rays, 1))
with self.assertRaises(AssertionError):
lidar_writer.store_frame(
direction=frame["direction"],
timestamp_us=frame["timestamp_us"],
model_element=model_element,
distance_m=frame["distance_m"],
intensity=frame["intensity"],
frame_timestamps_us=frame["frame_timestamps_us"],
generic_data={},
generic_meta_data={},
)
tmpdir.cleanup()

def test_legacy_n2_uint16_roundtrip(self) -> None:
"""The original (N, 2) uint16 layout continues to work unchanged."""
me = np.arange(4 * 2, dtype=np.uint16).reshape((4, 2))
read_back = self._roundtrip(me)
self.assertEqual(read_back.shape, (4, 2))
self.assertEqual(read_back.dtype, np.dtype("uint16"))
np.testing.assert_array_equal(read_back, me)

def test_flat_n_roundtrip(self) -> None:
"""A flat per-ray index (N,) round-trips."""
me = np.arange(5, dtype=np.uint32)
read_back = self._roundtrip(me)
self.assertEqual(read_back.shape, (5,))
self.assertEqual(read_back.dtype, np.dtype("uint32"))
np.testing.assert_array_equal(read_back, me)

def test_higher_dim_roundtrip(self) -> None:
"""An (N, A, B) model index round-trips with all trailing dims preserved."""
me = np.arange(5 * 2 * 3, dtype=np.int32).reshape((5, 2, 3))
read_back = self._roundtrip(me)
self.assertEqual(read_back.shape, (5, 2, 3))
self.assertEqual(read_back.dtype, np.dtype("int32"))
np.testing.assert_array_equal(read_back, me)

def test_float32_sampling_coordinate_roundtrip(self) -> None:
"""A float32 continuous sampling coordinate (N, 2) round-trips."""
me = (np.arange(5 * 2, dtype=np.float32).reshape((5, 2)) * 0.5).astype(np.float32)
read_back = self._roundtrip(me)
self.assertEqual(read_back.shape, (5, 2))
self.assertEqual(read_back.dtype, np.dtype("float32"))
np.testing.assert_array_equal(read_back, me)

def test_float16_sampling_coordinate_roundtrip(self) -> None:
"""A float16 continuous sampling coordinate (N, 2) round-trips."""
me = (np.arange(5 * 2, dtype=np.float16).reshape((5, 2)) * 0.5).astype(np.float16)
read_back = self._roundtrip(me)
self.assertEqual(read_back.shape, (5, 2))
self.assertEqual(read_back.dtype, np.dtype("float16"))
np.testing.assert_array_equal(read_back, me)

def test_rejects_float64(self) -> None:
"""float64 model_element is rejected as too storage-heavy."""
me = np.arange(3 * 2, dtype=np.float64).reshape((3, 2))
self._assert_store_rejected(me)

def test_rejects_zero_dim_scalar(self) -> None:
"""A 0-d array (no per-ray dimension) is rejected."""
me = np.array(3, dtype=np.uint16)
self._assert_store_rejected(me)

def test_rejects_wrong_leading_dim(self) -> None:
"""A model_element whose leading dim != n_rays is rejected."""
# n_rays from the frame is 5, but provide 4 leading entries
lidar_writer, _store_writer, tmpdir = self._make_writer()
frame = self._make_lidar_frame(5)
me = np.arange(4 * 2, dtype=np.uint16).reshape((4, 2))
with self.assertRaises(AssertionError):
lidar_writer.store_frame(
direction=frame["direction"],
timestamp_us=frame["timestamp_us"],
model_element=me,
distance_m=frame["distance_m"],
intensity=frame["intensity"],
frame_timestamps_us=frame["frame_timestamps_us"],
generic_data={},
generic_meta_data={},
)
tmpdir.cleanup()


if __name__ == "__main__":
unittest.main()
Loading