diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 6435003758..19aedc6827 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -162,6 +162,38 @@ def stop(self) -> None: self._started = False +class H264LcmTransport(LCMTransport): # type: ignore[type-arg] + def __init__( + self, + topic: str, + type: type, + config: Any | None = None, + decode_images: bool = True, + **kwargs: Any, + ) -> None: # type: ignore[no-untyped-def] + from dimos.protocol.pubsub.impl.h264_lcm import H264LCM + from dimos.protocol.video.h264 import H264Config + + self.config = config or H264Config() + self.decode_images = decode_images + self.lcm = H264LCM(config=self.config, decode_images=decode_images, **kwargs) # type: ignore[assignment] + super().__init__(topic, type) + + def __reduce__(self): # type: ignore[no-untyped-def] + return ( + H264LcmTransport, + (self.topic.topic, self.topic.lcm_type, self.config, self.decode_images), + ) + + def start(self) -> None: + self.lcm.start() + self._started = True + + def stop(self) -> None: + self.lcm.stop() + self._started = False + + class pSHMTransport(PubSubTransport[T]): _started: bool = False diff --git a/dimos/experimental/security_demo/depth_estimator.py b/dimos/experimental/security_demo/depth_estimator.py index 5737d3f006..55abb4b6a8 100644 --- a/dimos/experimental/security_demo/depth_estimator.py +++ b/dimos/experimental/security_demo/depth_estimator.py @@ -84,7 +84,7 @@ def _loop(self) -> None: def _process(self, image: Image) -> None: rgb = image.to_rgb() - pil_image = PILImage.fromarray(rgb.data) + pil_image = PILImage.fromarray(rgb.require_raw("DepthEstimator._process")) if pil_image.width > _DEPTH_MAX_WIDTH: scale = _DEPTH_MAX_WIDTH / pil_image.width new_h = int(pil_image.height * scale) diff --git a/dimos/experimental/security_demo/security_module.py b/dimos/experimental/security_demo/security_module.py index 9569227805..dbc9c2dc5c 100644 --- a/dimos/experimental/security_demo/security_module.py +++ b/dimos/experimental/security_demo/security_module.py @@ -299,7 +299,7 @@ def _patrol_step(self) -> None: ) annotated = draw_bounding_box( - image.data.copy(), + image.require_raw("SecurityModule._detection_step").copy(), list(best.bbox), label=best.name, confidence=best.confidence, @@ -340,7 +340,7 @@ def _follow_step(self) -> None: twist = self._visual_servo.compute_twist(best.bbox, latest_image.width) self.cmd_vel.publish(twist) - overlay = latest_image.data.copy() + overlay = latest_image.require_raw("SecurityModule._follow_step").copy() if hasattr(best, "mask") and best.mask is not None: mask_bool = best.mask > 0 green = np.zeros_like(overlay) diff --git a/dimos/mapping/occupancy/visualize_path.py b/dimos/mapping/occupancy/visualize_path.py index 89dcf83067..41b19e5686 100644 --- a/dimos/mapping/occupancy/visualize_path.py +++ b/dimos/mapping/occupancy/visualize_path.py @@ -30,7 +30,7 @@ def visualize_path( scale: int = 8, ) -> Image: image = visualize_occupancy_grid(occupancy_grid, "rainbow") - bgr = image.data + bgr = image.require_raw("visualize_path") bgr = cv2.resize( bgr, diff --git a/dimos/mapping/osm/current_location_map.py b/dimos/mapping/osm/current_location_map.py index d573370a06..d723e1e2ff 100644 --- a/dimos/mapping/osm/current_location_map.py +++ b/dimos/mapping/osm/current_location_map.py @@ -74,7 +74,8 @@ def _fetch_new_map(self) -> None: assert self._map_image is not None assert self._position is not None - pil_image = PILImage.fromarray(self._map_image.image.data) + map_data = self._map_image.image.require_raw("CurrentLocationMap._fetch_new_map") + pil_image = PILImage.fromarray(map_data) draw = ImageDraw.Draw(pil_image) x, y = self._map_image.latlon_to_pixel(self._position) radius = 20 @@ -85,7 +86,7 @@ def _fetch_new_map(self) -> None: width=3, ) - self._map_image.image.data[:] = np.array(pil_image) + map_data[:] = np.array(pil_image) def _position_is_too_far_off_center(self) -> bool: x, y = self._map_image.latlon_to_pixel(self._position) # type: ignore[arg-type, union-attr] diff --git a/dimos/memory2/codecs/base.py b/dimos/memory2/codecs/base.py index 821b36b60f..ed9c76f6d2 100644 --- a/dimos/memory2/codecs/base.py +++ b/dimos/memory2/codecs/base.py @@ -75,6 +75,11 @@ def codec_from_id(codec_id_str: str, payload_module: str) -> Codec[Any]: def _class_to_id(codec: Any) -> str: + explicit_id = getattr(codec, "CODEC_ID", None) + if explicit_id is not None: + if not isinstance(explicit_id, str): + raise TypeError(f"Codec CODEC_ID must be str, got {type(explicit_id).__name__}") + return explicit_id name = type(codec).__name__ if name.endswith("Codec"): return name[:-5].lower() @@ -101,6 +106,10 @@ def _make_one(name: str, payload_module: str, inner: Codec[Any] | None = None) - from dimos.memory2.codecs.jpeg import JpegCodec return JpegCodec() + if name == "h264": + from dimos.memory2.video.h264 import H264ImageCodec + + return H264ImageCodec() if name == "lcm": from dimos.memory2.codecs.lcm import LcmCodec diff --git a/dimos/memory2/video/h264.py b/dimos/memory2/video/h264.py new file mode 100644 index 0000000000..d833cebc2c --- /dev/null +++ b/dimos/memory2/video/h264.py @@ -0,0 +1,51 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image + + +class H264ImageCodec: + """memory2 codec for already-H.264 encoded Image payloads. + + This codec deliberately does not decode pixels. It persists an ``Image`` whose + ``encoding`` is ``"h264"`` and restores the same encoded image on read. A + separate H.264 decode session turns the encoded stream back into raw Images + for visualization or module consumption. + """ + + CODEC_ID = "h264" + + def encode(self, value: Image) -> bytes: + if value.encoding != H264_IMAGE_ENCODING: + raise ValueError( + f"H264ImageCodec stores encoded Images; got encoding={value.encoding!r}" + ) + return value.lcm_encode() + + def decode(self, data: bytes) -> Image: + image = Image.lcm_decode(data) + if image.encoding != H264_IMAGE_ENCODING: + raise ValueError( + f"H264ImageCodec expected encoded Image; got encoding={image.encoding!r}" + ) + return image + + +def is_h264_image(image: Image) -> bool: + return image.encoding == H264_IMAGE_ENCODING + + +__all__ = ["H264ImageCodec", "is_h264_image"] diff --git a/dimos/memory2/video/test_h264_storage.py b/dimos/memory2/video/test_h264_storage.py new file mode 100644 index 0000000000..74eafb2b7a --- /dev/null +++ b/dimos/memory2/video/test_h264_storage.py @@ -0,0 +1,138 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from pathlib import Path +import platform + +import numpy as np +import pytest + +from dimos.memory2.backend import Backend +from dimos.memory2.codecs.base import codec_from_id, codec_id +from dimos.memory2.codecs.jpeg import JpegCodec +from dimos.memory2.store.sqlite import SqliteStore +from dimos.memory2.video.h264 import H264ImageCodec +from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat + +_SKIP_SQLITE_VEC = platform.machine() == "aarch64" or platform.system() == "Darwin" + + +def _raw_image(seq: int, fmt: ImageFormat = ImageFormat.RGB) -> Image: + data = np.full((2, 2, 3), seq, dtype=np.uint8) + if fmt == ImageFormat.GRAY: + data = np.full((2, 2), seq, dtype=np.uint8) + return Image.from_numpy(data, format=fmt, frame_id="cam", ts=float(seq)) + + +def _encoded_image(seq: int, *, key: bool = True) -> Image: + return Image.encoded( + data=b"\x00\x00\x00\x01\x65" + bytes([seq]), + encoding=H264_IMAGE_ENCODING, + format=ImageFormat.RGB, + frame_id="cam", + ts=float(seq), + codec_metadata={ + "seq": seq, + "codec": "h264", + "bitstream": "annex_b", + "is_keyframe": key, + "keyframe_seq": seq if key else 0, + "pts": seq * 90, + "width": 2, + "height": 2, + "channels": 3, + "dtype": "uint8", + }, + ) + + +def test_h264_image_codec_roundtrips_encoded_image() -> None: + codec = H264ImageCodec() + image = _encoded_image(1) + + decoded = codec.decode(codec.encode(image)) + + assert decoded == image + assert decoded.encoding == H264_IMAGE_ENCODING + assert decoded.codec_metadata["seq"] == 1 + assert decoded.width == 2 + assert decoded.height == 2 + + +def test_h264_image_codec_rejects_raw_images() -> None: + codec = H264ImageCodec() + + with pytest.raises(ValueError, match="encoded Images"): + codec.encode(_raw_image(1)) + + +def test_codec_id_and_factory_support_h264_for_image() -> None: + codec = H264ImageCodec() + + assert codec_id(codec) == "h264" + assert isinstance(codec_from_id("h264", "dimos.msgs.sensor_msgs.Image.Image"), H264ImageCodec) + + +def test_h264_stream_stores_encoded_images_with_normal_backend(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + db = tmp_path / "h264.db" + with SqliteStore(path=str(db)) as store: + stream = store.stream("cam", Image, codec="h264") + stored = stream.append(_encoded_image(1), ts=1.0) + assert stored.data.encoding == H264_IMAGE_ENCODING + assert stored.data.codec_metadata["seq"] == 1 + + with SqliteStore(path=str(db), must_exist=True) as reopened: + stream = reopened.stream("cam", Image) + obs = stream.first() + assert obs.data.encoding == H264_IMAGE_ENCODING + assert obs.data.codec_metadata["seq"] == 1 + assert obs.data.width == 2 + + +def test_h264_replay_emits_encoded_images(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + store = SqliteStore(path=str(tmp_path / "replay.db")) + stream = store.stream("cam", Image, codec="h264") + stream.append(_encoded_image(1), ts=1.0) + stream.append(_encoded_image(2, key=False), ts=2.0) + + replayed = list(store.replay().streams.cam.iterate()) + + assert [image.encoding for image in replayed] == [H264_IMAGE_ENCODING, H264_IMAGE_ENCODING] + assert [image.codec_metadata["seq"] for image in replayed] == [1, 2] + + +def test_default_image_stream_still_uses_jpeg_codec(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + store = SqliteStore(path=str(tmp_path / "jpeg.db")) + stream = store.stream("rgb", Image) + + source = stream._source + assert isinstance(source, Backend) + assert isinstance(source.codec, JpegCodec) + + +def test_encoded_images_reject_pixel_operations() -> None: + image = _encoded_image(1) + + with pytest.raises(ValueError, match="requires raw Image data"): + image.to_rgb() + with pytest.raises(ValueError, match="requires raw Image data"): + image.as_numpy() diff --git a/dimos/memory2/vis/utils.py b/dimos/memory2/vis/utils.py index fee6f66057..ff8f90fbed 100644 --- a/dimos/memory2/vis/utils.py +++ b/dimos/memory2/vis/utils.py @@ -65,7 +65,7 @@ def mosaic( canvas = np.zeros((rows * cell_height, cols * cell_w, 3), dtype=np.uint8) for i, img in enumerate(images): r, c = divmod(i, cols) - tile = cv2.resize(img.to_bgr().data, (cell_w, cell_height)) + tile = cv2.resize(img.to_bgr().require_raw("mosaic_observations"), (cell_w, cell_height)) canvas[r * cell_height : (r + 1) * cell_height, c * cell_w : (c + 1) * cell_w] = tile result = Image(data=canvas, format=ImageFormat.BGR) diff --git a/dimos/models/vl/florence.py b/dimos/models/vl/florence.py index 8e964bb85f..b1aa56bc16 100644 --- a/dimos/models/vl/florence.py +++ b/dimos/models/vl/florence.py @@ -98,7 +98,7 @@ def caption(self, image: Image, detail: str | CaptionDetail | None = None) -> st task_prompt = CaptionDetail.from_str(detail).value # Convert to PIL - pil_image = PILImage.fromarray(image.to_rgb().data) + pil_image = PILImage.fromarray(image.to_rgb().require_raw("Florence2Model.caption")) # Process inputs inputs = self._processor(text=task_prompt, images=pil_image, return_tensors="pt") @@ -137,7 +137,10 @@ def caption_batch(self, *images: Image) -> list[str]: task_prompt = self._task_prompt # Convert all to PIL - pil_images = [PILImage.fromarray(img.to_rgb().data) for img in images] + pil_images = [ + PILImage.fromarray(img.to_rgb().require_raw("Florence2Model.caption_batch")) + for img in images + ] # Process batch inputs = self._processor( diff --git a/dimos/models/vl/moondream.py b/dimos/models/vl/moondream.py index e3cfe744ce..b7c1a0cc25 100644 --- a/dimos/models/vl/moondream.py +++ b/dimos/models/vl/moondream.py @@ -67,7 +67,7 @@ def _to_pil(self, image: Image | np.ndarray[Any, Any]) -> PILImage.Image: image, _ = self._prepare_image(image) rgb_image = image.to_rgb() - return PILImage.fromarray(rgb_image.data) + return PILImage.fromarray(rgb_image.require_raw("MoondreamVlModel._to_pil")) def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: # type: ignore[no-untyped-def] pil_image = self._to_pil(image) diff --git a/dimos/models/vl/moondream_hosted.py b/dimos/models/vl/moondream_hosted.py index 76e55451a1..2a6f81977d 100644 --- a/dimos/models/vl/moondream_hosted.py +++ b/dimos/models/vl/moondream_hosted.py @@ -54,7 +54,7 @@ def _to_pil_image(self, image: Image | np.ndarray) -> PILImage.Image: image = Image.from_numpy(image) rgb_image = image.to_rgb() - return PILImage.fromarray(rgb_image.data) + return PILImage.fromarray(rgb_image.require_raw("MoondreamHostedVlModel._to_pil_image")) def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: # type: ignore[no-untyped-def] pil_image = self._to_pil_image(image) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 5eaca03886..30c7aeb158 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -17,6 +17,8 @@ import base64 from dataclasses import dataclass, field from enum import Enum +import json +import struct import time from typing import TYPE_CHECKING, Any, Literal, TypedDict import warnings @@ -82,41 +84,85 @@ class AgentImageMessage(TypedDict): data: str # Base64 encoded image data +RAW_IMAGE_ENCODING = "raw" +H264_IMAGE_ENCODING = "h264" +_ENCODED_IMAGE_MAGIC = b"DIMI1" + + +def _data_equal(left: np.ndarray[Any, np.dtype[Any]] | bytes, right: object) -> bool: + if isinstance(left, np.ndarray) and isinstance(right, np.ndarray): + return bool(np.array_equal(left, right)) + if isinstance(left, bytes) and isinstance(right, bytes): + return left == right + return False + + +def _pack_encoded_image_payload(metadata: dict[str, Any], payload: bytes) -> bytes: + header = json.dumps(metadata, sort_keys=True, separators=(",", ":")).encode("utf-8") + return _ENCODED_IMAGE_MAGIC + struct.pack(">I", len(header)) + header + payload + + +def _unpack_encoded_image_payload(payload: bytes) -> tuple[dict[str, Any], bytes]: + if not payload.startswith(_ENCODED_IMAGE_MAGIC): + return {}, payload + offset = len(_ENCODED_IMAGE_MAGIC) + header_len = struct.unpack(">I", payload[offset : offset + 4])[0] + header_start = offset + 4 + header_end = header_start + header_len + metadata = json.loads(payload[header_start:header_end].decode("utf-8")) + return metadata, payload[header_end:] + + @dataclass class Image(Timestamped): - """Simple NumPy-based image container.""" + """Image container for raw pixels or explicitly encoded image payloads.""" msg_name = "sensor_msgs.Image" - data: np.ndarray[Any, np.dtype[Any]] = field( + data: np.ndarray[Any, np.dtype[Any]] | bytes = field( default_factory=lambda: np.zeros((1, 1, 3), dtype=np.uint8) ) format: ImageFormat = field(default=ImageFormat.BGR) frame_id: str = field(default="") ts: float = field(default_factory=time.time) + encoding: str = field(default=RAW_IMAGE_ENCODING) + codec_metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: - if not isinstance(self.data, np.ndarray): - self.data = np.asarray(self.data) - if self.data.ndim < 2: - raise ValueError("Image requires a 2D/3D NumPy array") + if self.encoding == RAW_IMAGE_ENCODING: + if not isinstance(self.data, np.ndarray): + self.data = np.asarray(self.data) + if not isinstance(self.data, np.ndarray): + raise TypeError("Raw Image payload must be a NumPy array") + arr: np.ndarray[Any, np.dtype[Any]] = self.data + if arr.ndim < 2: + raise ValueError("Image requires a 2D/3D NumPy array") + return + if isinstance(self.data, bytearray): + self.data = bytes(self.data) + elif not isinstance(self.data, bytes): + self.data = memoryview(np.ascontiguousarray(self.data)).tobytes() + if not self.data: + raise ValueError("Encoded Image payload cannot be empty") def __str__(self) -> str: return ( f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, " - f"ts={to_human_readable(self.ts)})" + f"encoding={self.encoding}, ts={to_human_readable(self.ts)})" ) def __repr__(self) -> str: - return f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, frame_id='{self.frame_id}', ts={self.ts})" + return f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, encoding='{self.encoding}', frame_id='{self.frame_id}', ts={self.ts})" def __eq__(self, other: object) -> bool: if not isinstance(other, Image): return False return ( - np.array_equal(self.data, other.data) + _data_equal(self.data, other.data) and self.format == other.format and self.frame_id == other.frame_id + and self.encoding == other.encoding + and self.codec_metadata == other.codec_metadata and abs(self.ts - other.ts) < 1e-6 ) @@ -124,40 +170,103 @@ def __len__(self) -> int: return int(self.height * self.width) def __getstate__(self) -> dict[str, Any]: - return {"data": self.data, "format": self.format, "frame_id": self.frame_id, "ts": self.ts} + return { + "data": self.data, + "format": self.format, + "frame_id": self.frame_id, + "ts": self.ts, + "encoding": self.encoding, + "codec_metadata": self.codec_metadata, + } def __setstate__(self, state: dict[str, Any]) -> None: self.data = state.get("data", np.zeros((1, 1, 3), dtype=np.uint8)) self.format = state.get("format", ImageFormat.BGR) self.frame_id = state.get("frame_id", "") self.ts = state.get("ts", time.time()) + self.encoding = state.get("encoding", RAW_IMAGE_ENCODING) + self.codec_metadata = state.get("codec_metadata", {}) + + @property + def is_raw(self) -> bool: + return self.encoding == RAW_IMAGE_ENCODING + + @property + def is_encoded(self) -> bool: + return not self.is_raw + + def require_raw(self, operation: str = "operation") -> np.ndarray[Any, np.dtype[Any]]: + if self.encoding != RAW_IMAGE_ENCODING: + raise ValueError(f"{operation} requires raw Image data; got encoding={self.encoding!r}") + assert isinstance(self.data, np.ndarray) + return self.data @property def height(self) -> int: - return int(self.data.shape[0]) + if self.is_encoded: + return int(self.codec_metadata.get("height", 0)) + arr = self.require_raw("height") + return int(arr.shape[0]) @property def width(self) -> int: - return int(self.data.shape[1]) + if self.is_encoded: + return int(self.codec_metadata.get("width", 0)) + arr = self.require_raw("width") + return int(arr.shape[1]) @property def channels(self) -> int: - if self.data.ndim == 2: + if self.is_encoded: + if "channels" in self.codec_metadata: + return int(self.codec_metadata["channels"]) + if self.format in ( + ImageFormat.GRAY, + ImageFormat.GRAY16, + ImageFormat.DEPTH, + ImageFormat.DEPTH16, + ): + return 1 + if self.format in (ImageFormat.RGBA, ImageFormat.BGRA): + return 4 + return 3 + arr = self.require_raw("channels") + if arr.ndim == 2: return 1 - if self.data.ndim == 3: - return int(self.data.shape[2]) + if arr.ndim == 3: + return int(arr.shape[2]) raise ValueError("Invalid image dimensions") @property def shape(self) -> tuple[int, ...]: - return tuple(self.data.shape) + if self.is_encoded: + if self.channels == 1: + return (self.height, self.width) + return (self.height, self.width, self.channels) + return tuple(self.require_raw("shape").shape) @property def dtype(self) -> np.dtype[Any]: - return self.data.dtype + if self.is_encoded: + return np.dtype(str(self.codec_metadata.get("dtype", "uint8"))) + return self.require_raw("dtype").dtype def copy(self) -> Image: - return Image(data=self.data.copy(), format=self.format, frame_id=self.frame_id, ts=self.ts) + data: np.ndarray[Any, np.dtype[Any]] | bytes + if self.is_encoded: + if not isinstance(self.data, bytes): + raise ValueError("Encoded Image payload must be bytes") + data = bytes(self.data) + else: + data = self.require_raw("copy").copy() + return Image( + data=data, + format=self.format, + frame_id=self.frame_id, + ts=self.ts, + encoding=self.encoding, + codec_metadata=dict(self.codec_metadata), + ) @classmethod def from_numpy( @@ -174,6 +283,27 @@ def from_numpy( ts=ts if ts is not None else time.time(), ) + @classmethod + def encoded( + cls, + *, + data: bytes, + encoding: str, + format: ImageFormat, + frame_id: str = "", + ts: float | None = None, + codec_metadata: dict[str, Any] | None = None, + ) -> Image: + metadata = dict(codec_metadata or {}) + return cls( + data=data, + format=format, + frame_id=frame_id, + ts=ts if ts is not None else time.time(), + encoding=encoding, + codec_metadata=metadata, + ) + @classmethod def from_file( cls, @@ -211,7 +341,7 @@ def from_opencv( def to_opencv(self) -> np.ndarray: """Convert to OpenCV BGR format.""" - arr = self.data + arr = self.require_raw("to_opencv") if self.format == ImageFormat.BGR: return arr if self.format == ImageFormat.RGB: @@ -231,12 +361,12 @@ def to_opencv(self) -> np.ndarray: def as_numpy(self) -> np.ndarray: """Get image data as numpy array.""" - return self.data + return self.require_raw("as_numpy") def to_rgb(self) -> Image: + arr = self.require_raw("to_rgb") if self.format == ImageFormat.RGB: return self.copy() - arr = self.data if self.format == ImageFormat.BGR: return Image( data=cv2.cvtColor(arr, cv2.COLOR_BGR2RGB), @@ -267,9 +397,9 @@ def to_rgb(self) -> Image: return self.copy() def to_bgr(self) -> Image: + arr = self.require_raw("to_bgr") if self.format == ImageFormat.BGR: return self.copy() - arr = self.data if self.format == ImageFormat.RGB: return Image( data=cv2.cvtColor(arr, cv2.COLOR_RGB2BGR), @@ -304,18 +434,19 @@ def to_bgr(self) -> Image: return self.copy() def to_grayscale(self) -> Image: + arr = self.require_raw("to_grayscale") if self.format in (ImageFormat.GRAY, ImageFormat.GRAY16, ImageFormat.DEPTH): return self.copy() if self.format == ImageFormat.BGR: return Image( - data=cv2.cvtColor(self.data, cv2.COLOR_BGR2GRAY), + data=cv2.cvtColor(arr, cv2.COLOR_BGR2GRAY), format=ImageFormat.GRAY, frame_id=self.frame_id, ts=self.ts, ) if self.format == ImageFormat.RGB: return Image( - data=cv2.cvtColor(self.data, cv2.COLOR_RGB2GRAY), + data=cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY), format=ImageFormat.GRAY, frame_id=self.frame_id, ts=self.ts, @@ -323,7 +454,7 @@ def to_grayscale(self) -> Image: if self.format in (ImageFormat.RGBA, ImageFormat.BGRA): code = cv2.COLOR_RGBA2GRAY if self.format == ImageFormat.RGBA else cv2.COLOR_BGRA2GRAY return Image( - data=cv2.cvtColor(self.data, code), + data=cv2.cvtColor(arr, code), format=ImageFormat.GRAY, frame_id=self.frame_id, ts=self.ts, @@ -332,11 +463,13 @@ def to_grayscale(self) -> Image: def to_rerun(self) -> Any: """Convert to rerun Image format.""" - return _format_to_rerun(self.data, self.format) + return _format_to_rerun(self.require_raw("to_rerun"), self.format) def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> Image: return Image( - data=cv2.resize(self.data, (width, height), interpolation=interpolation), + data=cv2.resize( + self.require_raw("resize"), (width, height), interpolation=interpolation + ), format=self.format, frame_id=self.frame_id, ts=self.ts, @@ -372,7 +505,8 @@ def crop(self, x: int, y: int, width: int, height: int) -> Image: Returns: A new Image containing the cropped region """ - img_height, img_width = self.data.shape[:2] + arr = self.require_raw("crop") + img_height, img_width = arr.shape[:2] # Clamp the crop region to image bounds x = max(0, min(x, img_width)) @@ -381,10 +515,10 @@ def crop(self, x: int, y: int, width: int, height: int) -> Image: y_end = min(y + height, img_height) # Perform the crop using array slicing - if self.data.ndim == 2: - cropped_data = self.data[y:y_end, x:x_end] + if arr.ndim == 2: + cropped_data = arr[y:y_end, x:x_end] else: - cropped_data = self.data[y:y_end, x:x_end, :] + cropped_data = arr[y:y_end, x:x_end, :] return Image(data=cropped_data, format=self.format, frame_id=self.frame_id, ts=self.ts) @@ -396,8 +530,9 @@ def brightness(self) -> float: reading every pixel, and the mean converges quickly (CLT). """ max_val = 65535.0 if self.format in (ImageFormat.GRAY16, ImageFormat.DEPTH16) else 255.0 - step = max(1, max(self.data.shape[:2]) // 256) - return float(self.data[::step, ::step].mean() / max_val) + arr = self.require_raw("brightness") + step = max(1, max(arr.shape[:2]) // 256) + return float(arr[::step, ::step].mean() / max_val) @property def sharpness(self) -> float: @@ -406,7 +541,7 @@ def sharpness(self) -> float: Downsamples to ~160px wide before computing Laplacian variance for fast evaluation (~10-20x cheaper than full-res Sobel). """ - gray = self.to_grayscale().data + gray = self.to_grayscale().require_raw("sharpness") # Downsample to ~160px wide for cheap evaluation h, w = gray.shape[:2] if w > 160: @@ -486,6 +621,31 @@ def lcm_encode(self, frame_id: str | None = None) -> bytes: msg.header.stamp.sec = int(now) msg.header.stamp.nsec = int((now - int(now)) * 1e9) + if self.is_encoded: + if not isinstance(self.data, bytes): + raise ValueError("Encoded Image payload must be bytes") + codec_metadata = dict(self.codec_metadata) + codec_metadata.setdefault("width", self.width) + codec_metadata.setdefault("height", self.height) + codec_metadata.setdefault("channels", self.channels) + codec_metadata.setdefault("dtype", str(self.dtype)) + metadata = { + "format": self.format.value, + "encoding": self.encoding, + "codec_metadata": codec_metadata, + } + packed = _pack_encoded_image_payload(metadata, self.data) + msg.height = self.height + msg.width = self.width + msg.encoding = self.encoding + msg.is_bigendian = False + msg.step = 0 + msg.data_length = len(packed) + msg.data = packed + return msg.lcm_encode() # type: ignore[no-any-return] + + arr = self.require_raw("lcm_encode") + # Image properties msg.height = self.height msg.width = self.width @@ -493,10 +653,10 @@ def lcm_encode(self, frame_id: str | None = None) -> bytes: msg.is_bigendian = False # Calculate step (bytes per row) - channels = 1 if self.data.ndim == 2 else self.data.shape[2] + channels = 1 if arr.ndim == 2 else arr.shape[2] msg.step = self.width * self.dtype.itemsize * channels - view = memoryview(np.ascontiguousarray(self.data)).cast("B") # type: ignore[arg-type] + view = memoryview(np.ascontiguousarray(arr)).cast("B") # type: ignore[arg-type] msg.data_length = len(view) msg.data = view @@ -509,6 +669,26 @@ def lcm_decode(cls, data: bytes, **kwargs: Any) -> Image: # JPEG-compressed images use a different decode path. if msg.encoding == "jpeg": return cls.lcm_jpeg_decode(data, **kwargs) + if msg.encoding == H264_IMAGE_ENCODING: + metadata, payload = _unpack_encoded_image_payload(bytes(msg.data)) + codec_metadata = dict(metadata.get("codec_metadata", {})) + codec_metadata.setdefault("width", msg.width) + codec_metadata.setdefault("height", msg.height) + image_format = ImageFormat(metadata.get("format", ImageFormat.RGB.value)) + return cls.encoded( + data=payload, + encoding=H264_IMAGE_ENCODING, + format=image_format, + frame_id=msg.header.frame_id if hasattr(msg, "header") else "", + ts=( + msg.header.stamp.sec + msg.header.stamp.nsec / 1e9 + if hasattr(msg, "header") + and hasattr(msg.header, "stamp") + and msg.header.stamp.sec > 0 + else time.time() + ), + codec_metadata=codec_metadata, + ) fmt, dtype, channels = _parse_lcm_encoding(msg.encoding) arr: np.ndarray[Any, Any] = np.frombuffer(msg.data, dtype=dtype) @@ -542,7 +722,7 @@ def to_jpeg_bytes(self, quality: int = 75) -> bytes: jpeg = TurboJPEG() # Canonicalize to RGB so JPEG bytes are deterministic regardless of input format. - rgb_array = self.to_rgb().data + rgb_array = self.to_rgb().require_raw("to_jpeg_bytes") return jpeg.encode(rgb_array, quality=quality, pixel_format=TJPF_RGB) # type: ignore[no-any-return] def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> bytes: @@ -620,6 +800,8 @@ def lcm_jpeg_decode(cls, data: bytes, **kwargs: Any) -> Image: __all__ = [ + "H264_IMAGE_ENCODING", + "RAW_IMAGE_ENCODING", "Image", "ImageFormat", "sharpness_barrier", diff --git a/dimos/msgs/sensor_msgs/PointCloud2.py b/dimos/msgs/sensor_msgs/PointCloud2.py index ae30c41711..46396ac15d 100644 --- a/dimos/msgs/sensor_msgs/PointCloud2.py +++ b/dimos/msgs/sensor_msgs/PointCloud2.py @@ -223,13 +223,13 @@ def from_rgbd( PointCloud2 instance with colored points """ # Get color as RGB numpy array - color_data = color_image.to_rgb().data + color_data = color_image.to_rgb().require_raw("PointCloud2.from_rgbd color") if hasattr(color_data, "get"): # CuPy array color_data = color_data.get() color_data = np.ascontiguousarray(color_data) # Get depth numpy array - depth_data = depth_image.data + depth_data = depth_image.require_raw("PointCloud2.from_rgbd depth") if hasattr(depth_data, "get"): # CuPy array depth_data = depth_data.get() diff --git a/dimos/msgs/sensor_msgs/test_image.py b/dimos/msgs/sensor_msgs/test_image.py index d679c6cb69..354a52b78d 100644 --- a/dimos/msgs/sensor_msgs/test_image.py +++ b/dimos/msgs/sensor_msgs/test_image.py @@ -69,6 +69,18 @@ def test_opencv_conversion(img: Image) -> None: assert decoded_img == img +def test_eager_image_compatibility() -> None: + data = np.ones((2, 3, 3), dtype=np.uint8) + img = Image(data=data, format=ImageFormat.BGR, frame_id="cam", ts=11.0) + + assert img.data is data + assert img.height == 2 + assert img.width == 3 + assert img.channels == 3 + assert img.shape == (2, 3, 3) + assert img.dtype == np.dtype(np.uint8) + + @pytest.mark.tool def test_sharpness_stream() -> None: get_data("unitree_office_walk") # Preload data for testing diff --git a/dimos/perception/common/utils.py b/dimos/perception/common/utils.py index 20ffc2a254..f8e1a0a824 100644 --- a/dimos/perception/common/utils.py +++ b/dimos/perception/common/utils.py @@ -195,7 +195,7 @@ def rectify_image(image: Image, camera_matrix: np.ndarray, dist_coeffs: np.ndarr Returns an Image with numpy or cupy data depending on caller choice. """ - rect = cv2.undistort(image.data, camera_matrix, dist_coeffs) + rect = cv2.undistort(image.require_raw("rectify_image"), camera_matrix, dist_coeffs) return Image(data=rect, format=image.format, frame_id=image.frame_id, ts=image.ts) diff --git a/dimos/perception/experimental/temporal_memory/clip_filter.py b/dimos/perception/experimental/temporal_memory/clip_filter.py index 6ef7859e17..63cf9802ae 100644 --- a/dimos/perception/experimental/temporal_memory/clip_filter.py +++ b/dimos/perception/experimental/temporal_memory/clip_filter.py @@ -38,7 +38,7 @@ def _get_image_data(image: Image) -> np.ndarray[Any, Any]: """Extract numpy array from Image.""" if not hasattr(image, "data"): raise AttributeError(f"Image missing .data attribute: {type(image)}") - return image.data + return image.require_raw("_get_image_data") if CLIP_AVAILABLE: diff --git a/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py b/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py index 88ddee1157..d6e95e20d8 100644 --- a/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py +++ b/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py @@ -70,5 +70,7 @@ def is_scene_stale(frames: list["Frame"], stale_threshold: float = 5.0) -> bool: return False if not hasattr(first_img, "data") or not hasattr(last_img, "data"): return False - diff = np.abs(first_img.data.astype(float) - last_img.data.astype(float)) + first_data = first_img.require_raw("is_scene_stale first frame") + last_data = last_img.require_raw("is_scene_stale last frame") + diff = np.abs(first_data.astype(float) - last_data.astype(float)) return bool(diff.mean() < stale_threshold) diff --git a/dimos/perception/object_tracker_2d.py b/dimos/perception/object_tracker_2d.py index 653c519054..d527e025ce 100644 --- a/dimos/perception/object_tracker_2d.py +++ b/dimos/perception/object_tracker_2d.py @@ -93,7 +93,7 @@ def start(self) -> None: def on_frame(frame_msg: Image) -> None: arrival_time = time.perf_counter() with self._frame_lock: - self._latest_rgb_frame = frame_msg.data + self._latest_rgb_frame = frame_msg.require_raw("ObjectTracker2D.on_frame") self._frame_arrival_time = arrival_time unsub = self.color_image.subscribe(on_frame) diff --git a/dimos/perception/spatial_perception.py b/dimos/perception/spatial_perception.py index 4d1f1377f3..4b3f601a1c 100644 --- a/dimos/perception/spatial_perception.py +++ b/dimos/perception/spatial_perception.py @@ -190,7 +190,7 @@ def start(self) -> None: def set_video(image_msg: Image) -> None: # Convert Image message to numpy array if hasattr(image_msg, "data"): - frame = image_msg.data + frame = image_msg.require_raw("SpatialMemory.set_video") frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) self._latest_video_frame = frame else: diff --git a/dimos/protocol/pubsub/impl/h264_lcm.py b/dimos/protocol/pubsub/impl/h264_lcm.py new file mode 100644 index 0000000000..8784e1d93b --- /dev/null +++ b/dimos/protocol/pubsub/impl/h264_lcm.py @@ -0,0 +1,71 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""H.264-encoded Image transport over LCM.""" + +from __future__ import annotations + +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.pubsub.encoders import DecodingError, LCMTopicProto, PubSubEncoderMixin +from dimos.protocol.pubsub.impl.lcmpubsub import LCMPubSubBase +from dimos.protocol.video.h264 import H264Config, H264Decoder, H264Encoder, VideoDecodeGapError + + +class H264EncoderMixin(PubSubEncoderMixin[LCMTopicProto, Image, bytes]): + """Encoder mixin for Image streams using H.264 packets on the wire.""" + + def __init__( + self, + *, + config: H264Config | None = None, + decode_images: bool = True, + **kwargs: object, + ) -> None: + super().__init__(**kwargs) # type: ignore[misc] + self.h264_config = config or H264Config() + self.decode_images = decode_images + self._encoder: H264Encoder | None = None + self._decoder: H264Decoder | None = None + + def encode(self, msg: Image, topic: LCMTopicProto) -> bytes: + if self._encoder is None: + self._encoder = H264Encoder(self.h264_config) + return self._encoder.encode(msg).lcm_encode() + + def decode(self, msg: bytes, topic: LCMTopicProto) -> Image: + if topic.topic == "LCM_SELF_TEST": + raise DecodingError("Ignoring LCM_SELF_TEST topic") + if topic.lcm_type is not None and not issubclass(topic.lcm_type, Image): + raise DecodingError(f"H.264 LCM topic {topic.topic!r} is not typed as Image") + try: + image = Image.lcm_decode(msg) + except ValueError as exc: + raise DecodingError(str(exc)) from exc + if not self.decode_images: + return image + if self._decoder is None: + self._decoder = H264Decoder(self.h264_config) + try: + return self._decoder.decode(image) + except (VideoDecodeGapError, ValueError) as exc: + raise DecodingError(str(exc)) from exc + + +class H264LCM( # type: ignore[misc] + H264EncoderMixin, + LCMPubSubBase, +): ... + + +__all__ = ["H264LCM", "H264EncoderMixin"] diff --git a/dimos/protocol/pubsub/impl/test_h264_lcm.py b/dimos/protocol/pubsub/impl/test_h264_lcm.py new file mode 100644 index 0000000000..3b7987e0e4 --- /dev/null +++ b/dimos/protocol/pubsub/impl/test_h264_lcm.py @@ -0,0 +1,219 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass + +import numpy as np +import pytest + +from dimos.msgs.protocol import DimosMsg +from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat +from dimos.protocol.pubsub.encoders import DecodingError, LCMTopicProto +from dimos.protocol.pubsub.impl.h264_lcm import H264LCM, H264EncoderMixin +from dimos.protocol.video.h264 import VideoDecodeGapError + + +@dataclass +class StubTopic: + topic: str + lcm_type: type[DimosMsg] | None = None + + +def _encoded(image: Image, *, seq: int = 0, key: bool = True) -> Image: + return Image.encoded( + data=b"\x00\x00\x00\x01\x65" if key else b"\x00\x00\x00\x01\x41", + encoding=H264_IMAGE_ENCODING, + format=image.format, + frame_id=image.frame_id, + ts=image.ts, + codec_metadata={ + "seq": seq, + "codec": "h264", + "bitstream": "annex_b", + "is_keyframe": key, + "keyframe_seq": seq if key else 0, + "pts": seq * 90, + "width": image.width, + "height": image.height, + "channels": image.channels, + "dtype": str(image.dtype), + }, + ) + + +class FakeEncoder: + def encode(self, image: Image) -> Image: + return _encoded(image) + + +class FakeDecoder: + def __init__(self, *, fail: bool = False, invalid: bool = False) -> None: + self.fail = fail + self.invalid = invalid + + def decode(self, image: Image) -> Image: + if self.fail: + raise VideoDecodeGapError("waiting for keyframe") + if self.invalid: + raise ValueError("Expected H.264 encoded Image") + return Image( + data=np.zeros((image.height, image.width, 3), dtype=np.uint8), + format=image.format, + frame_id=image.frame_id, + ts=image.ts, + ) + + +class InMemoryPubSubBase: + def __init__(self, **_: object) -> None: + self._subscribers: list[tuple[LCMTopicProto, Callable[[bytes, LCMTopicProto], None]]] = [] + + def publish(self, topic: LCMTopicProto, message: bytes) -> None: + for subscribed_topic, callback in self._subscribers: + if subscribed_topic.topic == topic.topic: + callback(message, topic) + + def subscribe( + self, topic: LCMTopicProto, callback: Callable[[bytes, LCMTopicProto], None] + ) -> Callable[[], None]: + item = (topic, callback) + self._subscribers.append(item) + + def unsubscribe() -> None: + self._subscribers.remove(item) + + return unsubscribe + + +class InMemoryH264PubSub(H264EncoderMixin, InMemoryPubSubBase): # type: ignore[misc] + pass + + +def test_h264_lcm_encodes_image_as_h264_encoded_image_bytes() -> None: + transport = H264LCM() + transport._encoder = FakeEncoder() # type: ignore[assignment] + image = Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") + + payload = transport.encode(image, StubTopic("/color", Image)) + encoded = Image.lcm_decode(payload) + + assert encoded.encoding == H264_IMAGE_ENCODING + assert encoded.codec_metadata["codec"] == "h264" + assert encoded.codec_metadata["bitstream"] == "annex_b" + assert encoded.width == 3 + assert encoded.height == 2 + assert encoded.codec_metadata["is_keyframe"] is True + + +def test_h264_lcm_decodes_h264_image_bytes_to_raw_image() -> None: + transport = H264LCM() + transport._decoder = FakeDecoder() # type: ignore[assignment] + encoded = FakeEncoder().encode( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") + ) + + image = transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + + assert image.encoding == "raw" + assert image.frame_id == "cam" + assert image.shape == (2, 3, 3) + + +def test_h264_lcm_decode_false_returns_encoded_image() -> None: + transport = H264LCM(decode_images=False) + encoded = FakeEncoder().encode( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") + ) + + image = transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + + assert image.encoding == H264_IMAGE_ENCODING + assert image.frame_id == "cam" + + +def test_h264_lcm_suppresses_decode_gap() -> None: + transport = H264LCM() + transport._decoder = FakeDecoder(fail=True) # type: ignore[assignment] + encoded = FakeEncoder().encode( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") + ) + + with pytest.raises(DecodingError, match="waiting for keyframe"): + transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + + +def test_h264_lcm_suppresses_invalid_h264_image() -> None: + transport = H264LCM() + transport._decoder = FakeDecoder(invalid=True) # type: ignore[assignment] + encoded = FakeEncoder().encode( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") + ) + + with pytest.raises(DecodingError, match="Expected H.264 encoded Image"): + transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + + +def test_h264_lcm_suppresses_non_image_payload() -> None: + transport = H264LCM() + + with pytest.raises(DecodingError): + transport.decode(b"not-an-image", StubTopic("/color", Image)) + + +def test_h264_lcm_publish_subscribe_delivers_decoded_image() -> None: + transport = InMemoryH264PubSub() + transport._encoder = FakeEncoder() # type: ignore[assignment] + transport._decoder = FakeDecoder() # type: ignore[assignment] + topic = StubTopic("/color", Image) + received: list[Image] = [] + + transport.subscribe(topic, lambda image, _topic: received.append(image)) + transport.publish( + topic, + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam"), + ) + + assert len(received) == 1 + assert received[0].frame_id == "cam" + assert received[0].shape == (2, 3, 3) + + +def test_h264_lcm_late_subscriber_waits_for_keyframe() -> None: + transport = InMemoryH264PubSub() + topic = StubTopic("/color", Image) + received: list[Image] = [] + decoder = FakeDecoder(fail=True) + transport._decoder = decoder # type: ignore[assignment] + + transport.subscribe(topic, lambda image, _topic: received.append(image)) + delta = _encoded( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam"), + seq=1, + key=False, + ) + InMemoryPubSubBase.publish(transport, topic, delta.lcm_encode()) + + decoder.fail = False + keyframe = _encoded( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam"), + seq=2, + key=True, + ) + InMemoryPubSubBase.publish(transport, topic, keyframe.lcm_encode()) + + assert len(received) == 1 + assert received[0].frame_id == "cam" diff --git a/dimos/protocol/pubsub/registry.py b/dimos/protocol/pubsub/registry.py index 82afd2cef6..9bbb47f5e9 100644 --- a/dimos/protocol/pubsub/registry.py +++ b/dimos/protocol/pubsub/registry.py @@ -22,13 +22,13 @@ :[#] -- ````: registry key, e.g. ``lcm``, ``jpeg_lcm``, ``plcm``, ``pshm``, - ``shm``, ``jpeg_shm``. +- ````: registry key, e.g. ``lcm``, ``jpeg_lcm``, ``h264_lcm``, + ``plcm``, ``pshm``, ``shm``, ``jpeg_shm``. - ````: channel/key, passed verbatim to the transport constructor. - ````: optional ``module.ClassName`` resolved via ``dimos.msgs.helpers.resolve_msg_type`` (e.g. ``sensor_msgs.Image``). -Typed protos (``lcm``, ``jpeg_lcm``) require a message type — either from the +Typed protos (``lcm``, ``jpeg_lcm``, ``h264_lcm``) require a message type — either from the ``#``-suffix or the ``msg_type`` kwarg. Pickled / self-describing protos (``plcm``, ``pshm``, ``shm``, ``jpeg_shm``) don't. """ @@ -61,6 +61,16 @@ def _make_jpeg_lcm(topic: str, msg_type: type | None) -> Any: return JpegLcmTransport(topic, msg_type) +def _make_h264_lcm(topic: str, msg_type: type | None) -> Any: + if msg_type is None: + raise ValueError( + "proto 'h264_lcm' requires a message type (URI '#suffix' or msg_type kwarg)" + ) + from dimos.core.transport import H264LcmTransport + + return H264LcmTransport(topic, msg_type) + + def _make_plcm(topic: str, msg_type: type | None) -> Any: # pickled LCM: receivers unpickle Python objects, no type registration needed. from dimos.core.transport import pLCMTransport @@ -92,6 +102,7 @@ def _make_jpeg_shm(topic: str, msg_type: type | None) -> Any: _REGISTRY: dict[str, Callable[[str, type | None], Any]] = { "lcm": _make_lcm, "jpeg_lcm": _make_jpeg_lcm, + "h264_lcm": _make_h264_lcm, "plcm": _make_plcm, "pshm": _make_pshm, "shm": _make_shm, diff --git a/dimos/protocol/pubsub/test_registry.py b/dimos/protocol/pubsub/test_registry.py index 9d7796c2ce..70d8a8f073 100644 --- a/dimos/protocol/pubsub/test_registry.py +++ b/dimos/protocol/pubsub/test_registry.py @@ -17,6 +17,7 @@ import pytest from dimos.core.transport import ( + H264LcmTransport, JpegLcmTransport, JpegShmTransport, LCMTransport, @@ -35,7 +36,15 @@ def test_supported_protos_includes_known_set() -> None: """Registry exposes the canonical proto names.""" - assert set(supported_protos()) >= {"lcm", "jpeg_lcm", "plcm", "pshm", "shm", "jpeg_shm"} + assert set(supported_protos()) >= { + "lcm", + "jpeg_lcm", + "h264_lcm", + "plcm", + "pshm", + "shm", + "jpeg_shm", + } @pytest.mark.parametrize( @@ -43,6 +52,7 @@ def test_supported_protos_includes_known_set() -> None: [ ("lcm:/color_image", ("lcm", "/color_image", None)), ("jpeg_lcm:/color_image", ("jpeg_lcm", "/color_image", None)), + ("h264_lcm:/color_image", ("h264_lcm", "/color_image", None)), ("pshm:color_image", ("pshm", "color_image", None)), ("shm:foo/bar", ("shm", "foo/bar", None)), ( @@ -53,6 +63,10 @@ def test_supported_protos_includes_known_set() -> None: "jpeg_lcm:/color_image#sensor_msgs.Image", ("jpeg_lcm", "/color_image", "sensor_msgs.Image"), ), + ( + "h264_lcm:/color_image#sensor_msgs.Image", + ("h264_lcm", "/color_image", "sensor_msgs.Image"), + ), ], ) def test_parse_pubsub_uri_happy_paths(uri: str, expected: tuple[str, str, str | None]) -> None: @@ -92,6 +106,11 @@ def test_make_pubsub_transport_jpeg_lcm_uses_JpegLcmTransport() -> None: assert isinstance(t, JpegLcmTransport) +def test_make_pubsub_transport_h264_lcm_uses_H264LcmTransport() -> None: + t = make_pubsub_transport("h264_lcm:/color_image", msg_type=Image) + assert isinstance(t, H264LcmTransport) + + def test_make_pubsub_transport_plcm_uses_pLCMTransport() -> None: t = make_pubsub_transport("plcm:/anything") assert isinstance(t, pLCMTransport) diff --git a/dimos/protocol/video/demo_h264_video_e2e.py b/dimos/protocol/video/demo_h264_video_e2e.py new file mode 100644 index 0000000000..14f0285024 --- /dev/null +++ b/dimos/protocol/video/demo_h264_video_e2e.py @@ -0,0 +1,686 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Synthetic end-to-end H.264 image transport and memory2 storage demo.""" + +from __future__ import annotations + +import os +from pathlib import Path +import sqlite3 +import tempfile +import threading +import time +from typing import ClassVar, cast + +import cv2 +import numpy as np + +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.core.transport import H264LcmTransport +from dimos.hardware.sensors.camera.module import CameraModule +from dimos.hardware.sensors.camera.webcam import Webcam +from dimos.memory2.module import OnExisting, Recorder +from dimos.memory2.store.sqlite import SqliteStore +from dimos.memory2.stream import Stream +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.protocol.pubsub.impl.h264_lcm import H264LCM +from dimos.protocol.video.h264 import H264Config, H264Decoder, VideoDecodeGapError +from dimos.utils.data import backup_file +from dimos.utils.logging_config import setup_logger +from dimos.visualization.vis_module import vis_module + +logger = setup_logger() + + +class SyntheticVideoSourceConfig(ModuleConfig): + width: int = 160 + height: int = 120 + fps: float = 10.0 + frame_count: int = 90 + output_frame_id: str = "h264_e2e_camera" + seed: int = 7 + + +class SyntheticVideoSource(Module): + """Deterministic RGB image source for H.264 transport/storage QA.""" + + config: SyntheticVideoSourceConfig + color_image: Out[Image] + + _thread: threading.Thread | None = None + _stop_event: threading.Event | None = None + + @rpc + def start(self) -> None: + super().start() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._publish_loop, daemon=True) + self._thread.start() + logger.info( + "Started synthetic H.264 video source: %sx%s @ %.2f FPS for %s frames", + self.config.width, + self.config.height, + self.config.fps, + self.config.frame_count, + ) + + @rpc + def stop(self) -> None: + if self._stop_event is not None: + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + super().stop() + + def _publish_loop(self) -> None: + assert self._stop_event is not None + period = 1.0 / max(self.config.fps, 0.1) + next_publish = time.monotonic() + for seq in range(self.config.frame_count): + if self._stop_event.is_set(): + break + frame = self._make_frame(seq) + self.color_image.publish(frame) + next_publish += period + time.sleep(max(0.0, next_publish - time.monotonic())) + logger.info("Synthetic H.264 source finished publishing frames") + + def _make_frame(self, seq: int) -> Image: + yy, xx = np.indices((self.config.height, self.config.width), dtype=np.uint16) + base = (xx + (yy * 3) + (seq * 5) + self.config.seed) % 256 + data = np.stack( + (base, (base + 85) % 256, (base + 170) % 256), + axis=2, + ).astype(np.uint8) + return Image( + data=data, + format=ImageFormat.RGB, + frame_id=self.config.output_frame_id, + ts=time.time(), + ) + + +class _H264RecorderMixin: + """Mixin that stores selected Image inputs with the H.264 codec.""" + + h264_streams: ClassVar[frozenset[str]] = frozenset() + + @rpc + def start(self) -> None: + recorder = cast("Recorder", self) + Module.start(recorder) + + if recorder.config.g.replay: + logger.info( + "Replay mode active — Recorder disabled, leaving %s untouched", + recorder.config.db_path, + ) + return + + db_path = Path(recorder.config.db_path) + if db_path.exists(): + if recorder.config.on_existing is OnExisting.OVERWRITE: + db_path.unlink() + logger.info("Deleted existing recording %s", db_path) + elif recorder.config.on_existing is OnExisting.BACKUP: + backup = backup_file(db_path, keep_last=recorder.config.backup_keep_last) + if backup is None: + logger.info("Removed existing recording %s (backup_keep_last=0)", db_path) + else: + logger.info("Backed up existing recording %s -> %s", db_path, backup) + else: + raise FileExistsError(f"Recording already exists: {db_path}") + + if not recorder.inputs: + logger.warning("Recorder has no In ports — nothing to record, subclass the Recorder") + return + + for name, port in recorder.inputs.items(): + stream: Stream[Image] + h264_streams: frozenset[str] = getattr(self, "h264_streams", frozenset()) + if name in h264_streams: + stream = recorder.store.stream(name, port.type, codec="h264") + else: + stream = recorder.store.stream(name, port.type) + recorder._port_to_stream(name, port, stream) + logger.info("Recording %s (%s)", name, port.type.__name__) + + +class H264E2ERecorder(_H264RecorderMixin, Recorder): + """Recorder with a typed image input for the synthetic H.264 demo.""" + + h264_streams: ClassVar[frozenset[str]] = frozenset({"color_image"}) + color_image: In[Image] + + +class H264WebcamRecorder(_H264RecorderMixin, Recorder): + """Recorder with a typed image input for webcam H.264 QA.""" + + h264_streams: ClassVar[frozenset[str]] = frozenset({"color_image"}) + color_image: In[Image] + + +class JpegBenchmarkRecorder(Recorder): + """Recorder for the JPEG side of the storage-size benchmark.""" + + jpeg_image: In[Image] + + +class H264BenchmarkRecorder(_H264RecorderMixin, Recorder): + """Recorder for the H.264 side of the storage-size benchmark.""" + + h264_streams: ClassVar[frozenset[str]] = frozenset({"h264_image"}) + h264_image: In[Image] + + +class H264StorageBenchmarkSourceConfig(SyntheticVideoSourceConfig): + video_path: str = "" + width: int = 320 + height: int = 240 + fps: float = 15.0 + frame_count: int = 150 + output_frame_id: str = "h264_storage_benchmark_camera" + + +class H264StorageBenchmarkSource(Module): + """Publish identical raw frames to JPEG and H.264 recording paths.""" + + config: H264StorageBenchmarkSourceConfig + jpeg_image: Out[Image] + h264_image: Out[Image] + + _thread: threading.Thread | None = None + _stop_event: threading.Event | None = None + + @rpc + def start(self) -> None: + super().start() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._publish_loop, daemon=True) + self._thread.start() + video_path = self._configured_video_path() + source = str(video_path) if video_path is not None else "synthetic pattern" + logger.info( + "Started H.264/JPEG storage benchmark source: %s, %sx%s @ %.2f FPS for up to %s frames", + source, + self.config.width, + self.config.height, + self.config.fps, + self.config.frame_count, + ) + + @rpc + def stop(self) -> None: + if self._stop_event is not None: + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + super().stop() + + def _publish_loop(self) -> None: + assert self._stop_event is not None + video_path = self._configured_video_path() + if video_path is not None: + self._publish_video_file(video_path) + return + + period = 1.0 / max(self.config.fps, 0.1) + next_publish = time.monotonic() + for seq in range(self.config.frame_count): + if self._stop_event.is_set(): + break + frame = self._make_frame(seq) + self.jpeg_image.publish(frame) + self.h264_image.publish(frame.copy()) + next_publish += period + time.sleep(max(0.0, next_publish - time.monotonic())) + logger.info("H.264/JPEG storage benchmark source finished publishing frames") + + def _configured_video_path(self) -> Path | None: + value = self.config.video_path or os.environ.get("DIMOS_H264_BENCHMARK_VIDEO", "") + return Path(value).expanduser() if value else None + + def _publish_video_file(self, video_path: Path) -> None: + assert self._stop_event is not None + if not video_path.exists(): + logger.error("Benchmark video file does not exist: %s", video_path) + return + + capture = cv2.VideoCapture(str(video_path)) + try: + if not capture.isOpened(): + logger.error("Failed to open benchmark video file: %s", video_path) + return + + period = 1.0 / max(self.config.fps, 0.1) + next_publish = time.monotonic() + published = 0 + for seq in range(self.config.frame_count): + if self._stop_event.is_set(): + break + ok, frame_bgr = capture.read() + if not ok: + break + frame = self._image_from_video_frame(frame_bgr) + self.jpeg_image.publish(frame) + self.h264_image.publish(frame.copy()) + published = seq + 1 + next_publish += period + time.sleep(max(0.0, next_publish - time.monotonic())) + logger.info( + "H.264/JPEG storage benchmark video source published %s frames from %s", + published, + video_path, + ) + finally: + capture.release() + + def _image_from_video_frame(self, frame_bgr: np.ndarray) -> Image: + if self.config.width > 0 and self.config.height > 0: + frame_bgr = cv2.resize( + frame_bgr, + (self.config.width, self.config.height), + interpolation=cv2.INTER_AREA, + ) + frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) + return Image( + data=frame_rgb, + format=ImageFormat.RGB, + frame_id=self.config.output_frame_id, + ts=time.time(), + ) + + def _make_frame(self, seq: int) -> Image: + yy, xx = np.indices((self.config.height, self.config.width), dtype=np.uint16) + base = (xx + (yy * 2) + (seq * 4) + self.config.seed) % 256 + marker = ((xx // 20 + yy // 20 + seq) % 2) * 35 + data = np.stack( + (base, (base + 70 + marker) % 256, (base + 145) % 256), + axis=2, + ).astype(np.uint8) + return Image( + data=data, + format=ImageFormat.RGB, + frame_id=self.config.output_frame_id, + ts=time.time(), + ) + + +class H264StorageBenchmarkReporterConfig(ModuleConfig): + jpeg_db_path: str = "benchmark_jpeg.db" + h264_db_path: str = "benchmark_h264.db" + min_wait_seconds: float = 12.0 + wait_seconds: float = 18.0 + stable_seconds: float = 2.0 + poll_seconds: float = 0.5 + + +class H264StorageBenchmarkReporter(Module): + """Log the JPEG vs H.264 SQLite DB size comparison.""" + + config: H264StorageBenchmarkReporterConfig + + _thread: threading.Thread | None = None + _stop_event: threading.Event | None = None + _last_summary: str | None = None + + @rpc + def start(self) -> None: + super().start() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._report_loop, daemon=True) + self._thread.start() + + @rpc + def stop(self) -> None: + if self._stop_event is not None: + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + super().stop() + + @rpc + def summary(self) -> str: + """Return the most recent JPEG-vs-H.264 storage benchmark summary.""" + return self._last_summary or "benchmark summary not available yet" + + def _report_loop(self) -> None: + assert self._stop_event is not None + started_at = time.monotonic() + deadline = time.monotonic() + self.config.wait_seconds + stable_since: float | None = None + last_sizes: tuple[int, int] | None = None + jpeg_path = Path(self.config.jpeg_db_path) + h264_path = Path(self.config.h264_db_path) + + while time.monotonic() < deadline and not self._stop_event.is_set(): + if jpeg_path.exists() and h264_path.exists(): + sizes = ( + _sqlite_snapshot_size(jpeg_path), + _sqlite_snapshot_size(h264_path), + ) + if sizes == last_sizes: + stable_since = stable_since or time.monotonic() + recording_window_elapsed = ( + time.monotonic() - started_at >= self.config.min_wait_seconds + ) + if ( + recording_window_elapsed + and time.monotonic() - stable_since >= self.config.stable_seconds + ): + self._log_sizes(sizes[0], sizes[1]) + return + else: + last_sizes = sizes + stable_since = None + time.sleep(self.config.poll_seconds) + + if jpeg_path.exists() and h264_path.exists(): + self._log_sizes( + _sqlite_snapshot_size(jpeg_path), + _sqlite_snapshot_size(h264_path), + ) + else: + missing = [str(path) for path in (jpeg_path, h264_path) if not path.exists()] + self._last_summary = f"benchmark DB size unavailable; missing={missing}" + logger.warning(self._last_summary) + + def _log_sizes(self, jpeg_bytes: int, h264_bytes: int) -> None: + ratio = h264_bytes / jpeg_bytes if jpeg_bytes else float("inf") + saved = jpeg_bytes - h264_bytes + saved_pct = (saved / jpeg_bytes * 100.0) if jpeg_bytes else 0.0 + self._last_summary = ( + "H.264/JPEG storage benchmark: " + f"jpeg={jpeg_bytes} bytes ({jpeg_bytes / 1024 / 1024:.2f} MiB), " + f"h264={h264_bytes} bytes ({h264_bytes / 1024 / 1024:.2f} MiB), " + f"h264/jpeg={ratio:.3f}, saved={saved} bytes ({saved_pct:.1f}%)" + ) + logger.info(self._last_summary) + print(self._last_summary, flush=True) + + +def _sqlite_snapshot_size(path: Path) -> int: + """Return compact SQLite DB size, even while WAL sidecars are active.""" + if not path.exists(): + return 0 + try: + with tempfile.NamedTemporaryFile(prefix=f"{path.stem}-", suffix=".db") as tmp: + source = sqlite3.connect(f"file:{path}?mode=ro", uri=True) + target = sqlite3.connect(tmp.name) + try: + source.backup(target) + finally: + target.close() + source.close() + return Path(tmp.name).stat().st_size + except sqlite3.Error: + return _sqlite_live_file_size(path) + + +def _sqlite_live_file_size(path: Path) -> int: + total = path.stat().st_size if path.exists() else 0 + for suffix in ("-wal", "-shm"): + sidecar = Path(f"{path}{suffix}") + if sidecar.exists(): + total += sidecar.stat().st_size + return total + + +class H264MemoryReplayConfig(ModuleConfig): + db_path: str = "webcam_h264.db" + speed: float = 1.0 + seek: float | None = None + duration: float | None = None + loop: bool = False + + +class H264MemoryReplay(Module): + """Replay a memory2 H.264 image stream as decoded `Image` frames.""" + + config: H264MemoryReplayConfig + color_image: Out[Image] + + @rpc + def start(self) -> None: + super().start() + store = self.register_disposable(SqliteStore(path=self.config.db_path, must_exist=True)) + replay = store.replay( + speed=self.config.speed, + seek=self.config.seek, + duration=self.config.duration, + loop=self.config.loop, + ) + decoder = H264Decoder(_webcam_h264_config) + + def publish_decoded(image: Image) -> None: + try: + self.color_image.publish(decoder.decode(image)) + except VideoDecodeGapError: + # V1 best effort: seek/replay can begin mid-GOP. Suppress deltas + # until the next keyframe restores decoder state. + return + + def on_error(error: Exception) -> None: + logger.error("H.264 replay pipeline error: %s", error, exc_info=True) + + self.register_disposable( + replay.streams.color_image.observable().subscribe( + on_next=publish_decoded, + on_error=on_error, + ) + ) + + +class H264VideoProbe(Module): + """Probe decoded H.264 `Image` delivery and report QA status.""" + + color_image: In[Image] + + _lock: threading.Lock + _received: int + _last_ts: float | None + _dimensions: tuple[int, int] | None + _frame_id: str | None + _failures: list[str] + + @rpc + def start(self) -> None: + super().start() + self._lock = threading.Lock() + self._received = 0 + self._last_ts = None + self._dimensions = None + self._frame_id = None + self._failures = [] + self.color_image.subscribe(self._on_image) + + def _on_image(self, image: Image) -> None: + with self._lock: + if self._last_ts is not None and image.ts < self._last_ts: + self._failures.append(f"timestamp regressed: {image.ts} < {self._last_ts}") + dims = (image.width, image.height) + if self._dimensions is None: + self._dimensions = dims + elif self._dimensions != dims: + self._failures.append(f"dimension changed: {dims} != {self._dimensions}") + if self._frame_id is None: + self._frame_id = image.frame_id + elif self._frame_id != image.frame_id: + self._failures.append(f"frame_id changed: {image.frame_id} != {self._frame_id}") + self._last_ts = image.ts + self._received += 1 + + if self._received % 10 == 0: + logger.info( + "H.264 video probe received %s %s frames", + self._received, + image.encoding, + ) + + @rpc + def summary(self) -> str: + """Return decoded-frame QA status for the synthetic H.264 demo.""" + with self._lock: + status = "ok" if not self._failures else "failed" + return ( + f"status={status} received={self._received} " + f"dimensions={self._dimensions} frame_id={self._frame_id!r} " + f"last_ts={self._last_ts} failures={self._failures}" + ) + + +_h264_config = H264Config(bitrate=1_000_000, target_fps=10, keyframe_interval=15) +_webcam_h264_config = H264Config(bitrate=2_000_000, target_fps=15, keyframe_interval=30) +_benchmark_h264_config = H264Config(bitrate=1_500_000, target_fps=15, keyframe_interval=30) +_inter_machine_h264_topic = "/demo_h264_inter_machine/color_image" + + +def _webcam() -> Webcam: + return Webcam(camera_index=0, width=640, height=480, fps=15.0) + + +demo_h264_video_e2e = autoconnect( + SyntheticVideoSource.blueprint(), + H264E2ERecorder.blueprint( + db_path="h264_video_e2e.db", + on_existing=OnExisting.OVERWRITE, + ), + H264VideoProbe.blueprint(), +).transports( + { + ("color_image", Image): H264LcmTransport( + "/demo_h264_video_e2e/color_image", + Image, + config=_h264_config, + decode_images=False, + ) + } +) + + +demo_h264_storage_benchmark = autoconnect( + H264StorageBenchmarkSource.blueprint(), + JpegBenchmarkRecorder.blueprint( + db_path="benchmark_jpeg.db", + on_existing=OnExisting.OVERWRITE, + ), + H264BenchmarkRecorder.blueprint( + db_path="benchmark_h264.db", + on_existing=OnExisting.OVERWRITE, + ), + H264StorageBenchmarkReporter.blueprint( + jpeg_db_path="benchmark_jpeg.db", + h264_db_path="benchmark_h264.db", + ), +).transports( + { + ("h264_image", Image): H264LcmTransport( + "/demo_h264_storage_benchmark/h264_image", + Image, + config=_benchmark_h264_config, + decode_images=False, + ) + } +) + + +demo_h264_webcam_record = autoconnect( + CameraModule.blueprint(hardware=_webcam, transform=None, frequency=15.0), + H264WebcamRecorder.blueprint( + db_path="webcam_h264.db", + on_existing=OnExisting.OVERWRITE, + ), +).transports( + { + ("color_image", Image): H264LcmTransport( + "/demo_h264_webcam_record/color_image", + Image, + config=_webcam_h264_config, + decode_images=False, + ) + } +) + + +demo_h264_webcam_rerun = autoconnect( + CameraModule.blueprint(hardware=_webcam, transform=None, frequency=15.0), + H264VideoProbe.blueprint(), + vis_module( + "rerun", + rerun_config={"pubsubs": [H264LCM(config=_webcam_h264_config)]}, + ), +).transports( + { + ("color_image", Image): H264LcmTransport( + "/demo_h264_webcam_rerun/color_image", + Image, + config=_webcam_h264_config, + ) + } +) + + +demo_h264_webcam_publish = autoconnect( + CameraModule.blueprint(hardware=_webcam, transform=None, frequency=15.0), +).transports( + { + ("color_image", Image): H264LcmTransport( + _inter_machine_h264_topic, + Image, + config=_webcam_h264_config, + ) + } +) + + +demo_h264_rerun_subscribe = autoconnect( + H264VideoProbe.blueprint(), + vis_module( + "rerun", + rerun_config={"pubsubs": [H264LCM(config=_webcam_h264_config)]}, + ), +).transports( + { + ("color_image", Image): H264LcmTransport( + _inter_machine_h264_topic, + Image, + config=_webcam_h264_config, + ) + } +) + + +demo_h264_webcam_replay = autoconnect( + H264MemoryReplay.blueprint(db_path="webcam_h264.db"), + H264VideoProbe.blueprint(), + vis_module( + "rerun", + rerun_config={"pubsubs": [H264LCM(config=_webcam_h264_config)]}, + ), +).transports( + { + ("color_image", Image): H264LcmTransport( + "/demo_h264_webcam_replay/color_image", + Image, + config=_webcam_h264_config, + ) + } +) diff --git a/dimos/protocol/video/h264.py b/dimos/protocol/video/h264.py new file mode 100644 index 0000000000..69548f8c57 --- /dev/null +++ b/dimos/protocol/video/h264.py @@ -0,0 +1,396 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections.abc import Callable, Iterator, Sequence +from dataclasses import dataclass, field +from fractions import Fraction +from typing import TYPE_CHECKING, Any, Protocol, cast + +import numpy as np + +from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat + +if TYPE_CHECKING: + import av + + +H264_CODEC = "h264" +H264_BITSTREAM = "annex_b" + + +class MissingVideoDependencyError(ImportError): + """Raised when H.264 support is selected without required video packages.""" + + +class UnsupportedVideoImageError(ValueError): + """Raised when an image cannot be represented by the H.264 adapter.""" + + +class VideoDecodeGapError(RuntimeError): + """Raised when a decoder cannot safely decode because GOP state is invalid.""" + + +@dataclass(frozen=True) +class H264Config: + """Configuration for opt-in H.264 image encoding.""" + + bitrate: int = 2_000_000 + target_fps: int = 30 + keyframe_interval: int = 30 + profile: str = "baseline" + preset: str = "veryfast" + tune: str = "zerolatency" + max_gop_frames: int = 30 + pixel_format: str = "yuv420p" + supported_formats: tuple[ImageFormat, ...] = field( + default_factory=lambda: (ImageFormat.RGB, ImageFormat.BGR, ImageFormat.GRAY) + ) + + def __post_init__(self) -> None: + if self.bitrate <= 0: + raise ValueError("bitrate must be positive") + if self.target_fps <= 0: + raise ValueError("target_fps must be positive") + if self.keyframe_interval <= 0: + raise ValueError("keyframe_interval must be positive") + if self.max_gop_frames <= 0: + raise ValueError("max_gop_frames must be positive") + + +class H264CodecAdapter(Protocol): + """DimOS-facing codec adapter; hides aiortc/RTP details from public APIs.""" + + def encode_image(self, image: Image, *, force_keyframe: bool) -> tuple[bytes, int]: ... + + def decode_image(self, image: Image) -> Image: ... + + +@dataclass(frozen=True) +class H264AccessUnit: + """Complete Annex B access unit for one source frame.""" + + data: bytes + + @classmethod + def from_rtp_payloads( + cls, + payloads: Sequence[bytes], + depayload: Callable[[bytes], bytes], + ) -> H264AccessUnit: + """Assemble RTP-sized H.264 payloads into one Annex B access unit.""" + + if not payloads: + raise ValueError("H.264 encoder returned no payloads") + data = b"".join(depayload(payload) for payload in payloads) + if not data.startswith((b"\x00\x00\x01", b"\x00\x00\x00\x01")): + raise ValueError("H.264 access unit is not Annex B byte-stream data") + return cls(data=data) + + +def ensure_supported_image(image: Image, config: H264Config) -> None: + """Validate the first-version H.264 image input contract.""" + + if image.encoding != "raw": + raise UnsupportedVideoImageError( + f"H.264 encoding expects raw Image data; got encoding={image.encoding!r}" + ) + if image.format not in config.supported_formats: + supported = ", ".join(fmt.value for fmt in config.supported_formats) + raise UnsupportedVideoImageError( + f"H.264 image encoding supports {supported}; got {image.format.value}" + ) + if image.dtype != np.dtype(np.uint8): + raise UnsupportedVideoImageError( + f"H.264 image encoding requires uint8 data; got {image.dtype}" + ) + if image.channels not in (1, 3): + raise UnsupportedVideoImageError( + f"H.264 image encoding requires 1 or 3 channels; got {image.channels}" + ) + + +def h264_metadata(image: Image) -> dict[str, Any]: + """Return validated H.264 metadata from an encoded Image.""" + + if image.encoding != H264_IMAGE_ENCODING: + raise ValueError(f"Expected H.264 encoded Image, got encoding={image.encoding!r}") + metadata = image.codec_metadata + if metadata.get("codec", H264_CODEC) != H264_CODEC: + raise ValueError(f"Expected codec={H264_CODEC!r}, got {metadata.get('codec')!r}") + if metadata.get("bitstream", H264_BITSTREAM) != H264_BITSTREAM: + raise ValueError( + f"Expected bitstream={H264_BITSTREAM!r}, got {metadata.get('bitstream')!r}" + ) + for key in ("seq", "is_keyframe", "keyframe_seq", "pts", "width", "height"): + if key not in metadata: + raise ValueError(f"H.264 encoded Image missing metadata field {key!r}") + if not isinstance(image.data, bytes): + raise ValueError("H.264 encoded Image payload must be bytes") + return metadata + + +class AiortcH264Codec: + """Small adapter around aiortc's H.264 encoder/decoder internals.""" + + def __init__(self, config: H264Config | None = None) -> None: + self.config = config or H264Config() + try: + from aiortc.codecs.h264 import ( + MAX_FRAME_RATE, + H264Decoder as AiortcDecoder, + H264Encoder as AiortcEncoder, + h264_depayload, + ) + from aiortc.jitterbuffer import JitterFrame + import av + except ImportError as exc: + raise MissingVideoDependencyError( + "H.264 image mode requires aiortc, PyAV, FFmpeg, and H.264 codec support" + ) from exc + + self._av = av + self._jitter_frame_type = JitterFrame + self._depayload = h264_depayload + + class ConfiguredAiortcEncoder(AiortcEncoder): + def __init__(self, h264_config: H264Config) -> None: + super().__init__() + self._dimos_config = h264_config + + def _encode_frame(self, frame: av.VideoFrame, force_keyframe: bool) -> Iterator[bytes]: + configured_bitrate = self.codec.bit_rate if self.codec else None + if self.codec and ( + frame.width != self.codec.width + or frame.height != self.codec.height + or configured_bitrate is None + or abs(self.target_bitrate - configured_bitrate) / configured_bitrate > 0.1 + ): + self.buffer_data = b"" + self.buffer_pts = None + self.codec = None + + if force_keyframe: + frame.pict_type = av.video.frame.PictureType.I + else: + frame.pict_type = av.video.frame.PictureType.NONE + + if self.codec is None: + self.codec = av.CodecContext.create("libx264", "w") + self.codec.width = frame.width + self.codec.height = frame.height + self.codec.bit_rate = self.target_bitrate + self.codec.pix_fmt = self._dimos_config.pixel_format + self.codec.framerate = Fraction(MAX_FRAME_RATE, 1) + self.codec.time_base = Fraction(1, MAX_FRAME_RATE) + self.codec.options = { + "level": "31", + "preset": self._dimos_config.preset, + "tune": self._dimos_config.tune, + } + self.codec.profile = _av_h264_profile(self._dimos_config.profile) + + data_to_send = b"" + for package in self.codec.encode(frame): + data_to_send += bytes(package) + + if data_to_send: + yield from self._split_bitstream(data_to_send) + + self._encoder = ConfiguredAiortcEncoder(self.config) + self._decoder = AiortcDecoder() + self._frame_index = 0 + self._time_base = Fraction(1, self.config.target_fps) + if hasattr(self._encoder, "target_bitrate"): + self._encoder.target_bitrate = self.config.bitrate + + def encode_image(self, image: Image, *, force_keyframe: bool) -> tuple[bytes, int]: + ensure_supported_image(image, self.config) + frame = self._to_video_frame(image) + payloads, pts = self._encoder.encode(frame, force_keyframe=force_keyframe) + access_unit = H264AccessUnit.from_rtp_payloads(payloads, self._depayload) + return access_unit.data, int(pts) + + def decode_image(self, image: Image) -> Image: + metadata = h264_metadata(image) + assert isinstance(image.data, bytes) + frame = self._jitter_frame_type(data=image.data, timestamp=int(metadata["pts"])) + decoded_frames = self._decoder.decode(frame) + if not decoded_frames: + raise VideoDecodeGapError("H.264 decoder produced no frame") + return self._from_video_frame(cast("av.VideoFrame", decoded_frames[0]), image) + + def _to_video_frame(self, image: Image) -> av.VideoFrame: + fmt = _av_input_format(image.format) + frame = self._av.VideoFrame.from_ndarray( + np.ascontiguousarray(image.require_raw("h264 encode")), format=fmt + ) + frame.pts = self._frame_index + frame.time_base = self._time_base + self._frame_index += 1 + return frame + + @staticmethod + def _from_video_frame(frame: av.VideoFrame, image: Image) -> Image: + image_format = image.format + arr = frame.to_ndarray(format=_av_input_format(image_format)) + return Image(data=arr, format=image_format, frame_id=image.frame_id, ts=image.ts) + + +class H264Encoder: + """Encode a normal DimOS Image stream into per-frame H.264 Images.""" + + def __init__( + self, + config: H264Config | None = None, + *, + codec: H264CodecAdapter | None = None, + ) -> None: + self.config = config or H264Config() + self._codec = codec or AiortcH264Codec(self.config) + self._seq = 0 + self._keyframe_seq = -1 + + def encode(self, image: Image, *, force_keyframe: bool = False) -> Image: + ensure_supported_image(image, self.config) + is_keyframe = self._should_force_keyframe(force_keyframe) + access_unit, pts = self._codec.encode_image(image, force_keyframe=is_keyframe) + if is_keyframe: + self._keyframe_seq = self._seq + metadata: dict[str, Any] = { + "seq": self._seq, + "codec": H264_CODEC, + "bitstream": H264_BITSTREAM, + "is_keyframe": is_keyframe, + "keyframe_seq": self._keyframe_seq, + "pts": pts, + "width": image.width, + "height": image.height, + "channels": image.channels, + "dtype": str(image.dtype), + } + self._seq += 1 + return Image.encoded( + data=access_unit, + encoding=H264_IMAGE_ENCODING, + format=image.format, + frame_id=image.frame_id, + ts=image.ts, + codec_metadata=metadata, + ) + + def _should_force_keyframe(self, requested: bool) -> bool: + if requested or self._seq == 0 or self._keyframe_seq < 0: + return True + since_keyframe = self._seq - self._keyframe_seq + return since_keyframe >= min(self.config.keyframe_interval, self.config.max_gop_frames) + + +class GopBuffer: + """Track H.264 GOP validity across an encoded Image stream.""" + + def __init__(self) -> None: + self.expected_seq: int | None = None + self.keyframe_seq: int | None = None + self.valid = False + + def accept(self, image: Image) -> bool: + """Return True when the encoded Image can be safely decoded.""" + + metadata = h264_metadata(image) + seq = int(metadata["seq"]) + keyframe_seq = int(metadata["keyframe_seq"]) + is_keyframe = bool(metadata["is_keyframe"]) + + if self.expected_seq is not None and seq != self.expected_seq: + self.valid = False + self.expected_seq = seq + 1 + + if is_keyframe: + self.keyframe_seq = seq + self.valid = True + return True + + if not self.valid: + return False + if self.keyframe_seq is None or keyframe_seq != self.keyframe_seq: + self.valid = False + return False + return True + + +class H264Decoder: + """Decode H.264 encoded Images into normal raw DimOS Images.""" + + def __init__( + self, + config: H264Config | None = None, + *, + codec: H264CodecAdapter | None = None, + gop_buffer: GopBuffer | None = None, + ) -> None: + self.config = config or H264Config() + self._codec = codec or AiortcH264Codec(self.config) + self._gop_buffer = gop_buffer or GopBuffer() + + def decode(self, image: Image) -> Image: + metadata = h264_metadata(image) + if not self._gop_buffer.accept(image): + raise VideoDecodeGapError( + f"Cannot decode H.264 image seq={metadata['seq']}; waiting for next keyframe" + ) + return self._codec.decode_image(image) + + +def _av_input_format(format: ImageFormat) -> str: + match format: + case ImageFormat.RGB: + return "rgb24" + case ImageFormat.BGR: + return "bgr24" + case ImageFormat.GRAY: + return "gray" + case _: + raise UnsupportedVideoImageError(f"Unsupported H.264 image format: {format.value}") + + +def _av_h264_profile(profile: str) -> str: + match profile.lower(): + case "baseline": + return "Baseline" + case "main": + return "Main" + case "high": + return "High" + case _: + return profile + + +__all__ = [ + "H264_BITSTREAM", + "H264_CODEC", + "AiortcH264Codec", + "GopBuffer", + "H264AccessUnit", + "H264CodecAdapter", + "H264Config", + "H264Decoder", + "H264Encoder", + "MissingVideoDependencyError", + "UnsupportedVideoImageError", + "VideoDecodeGapError", + "ensure_supported_image", + "h264_metadata", +] diff --git a/dimos/protocol/video/test_h264.py b/dimos/protocol/video/test_h264.py new file mode 100644 index 0000000000..71aba194bf --- /dev/null +++ b/dimos/protocol/video/test_h264.py @@ -0,0 +1,180 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import builtins +from dataclasses import dataclass + +import numpy as np +import pytest + +from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat +from dimos.protocol.video.h264 import ( + AiortcH264Codec, + GopBuffer, + H264AccessUnit, + H264Config, + H264Decoder, + H264Encoder, + MissingVideoDependencyError, + UnsupportedVideoImageError, + VideoDecodeGapError, + h264_metadata, +) + + +@dataclass +class FakeCodec: + encoded_force_keyframes: list[bool] + decoded_sequences: list[int] + + def encode_image(self, image: Image, *, force_keyframe: bool) -> tuple[bytes, int]: + self.encoded_force_keyframes.append(force_keyframe) + if force_keyframe: + return b"\x00\x00\x00\x01\x67sps\x00\x00\x00\x01\x68pps\x00\x00\x00\x01\x65idr", 90 + return b"\x00\x00\x00\x01\x41delta", 180 + + def decode_image(self, image: Image) -> Image: + metadata = h264_metadata(image) + self.decoded_sequences.append(int(metadata["seq"])) + return Image( + data=np.zeros((image.height, image.width, 3), dtype=np.uint8), + format=image.format, + frame_id=image.frame_id, + ts=image.ts, + ) + + +def _image(format: ImageFormat = ImageFormat.RGB, dtype: np.dtype = np.dtype(np.uint8)) -> Image: + return Image( + data=np.zeros((4, 6, 3), dtype=dtype), + format=format, + frame_id="cam", + ts=123.0, + ) + + +def _encoded(seq: int, *, key: bool, keyframe_seq: int | None = None) -> Image: + return Image.encoded( + data=b"\x00\x00\x00\x01\x65" if key else b"\x00\x00\x00\x01\x41", + encoding=H264_IMAGE_ENCODING, + format=ImageFormat.RGB, + frame_id="cam", + ts=123.0 + seq, + codec_metadata={ + "seq": seq, + "codec": "h264", + "bitstream": "annex_b", + "is_keyframe": key, + "keyframe_seq": seq if key else (0 if keyframe_seq is None else keyframe_seq), + "pts": seq * 90, + "width": 6, + "height": 4, + "channels": 3, + "dtype": "uint8", + }, + ) + + +def test_encoded_h264_image_lcm_roundtrips_metadata_and_access_unit() -> None: + image = _encoded(0, key=True) + + decoded = Image.lcm_decode(image.lcm_encode()) + + assert decoded == image + assert decoded.encoding == H264_IMAGE_ENCODING + assert decoded.codec_metadata["codec"] == "h264" + assert decoded.codec_metadata["bitstream"] == "annex_b" + assert isinstance(decoded.data, bytes) + assert decoded.data.startswith(b"\x00\x00\x00\x01") + + +def test_access_unit_assembles_depayloaded_annex_b_fragments() -> None: + unit = H264AccessUnit.from_rtp_payloads( + [b"payload-a", b"payload-b"], + lambda payload: b"\x00\x00\x00\x01" + payload, + ) + + assert unit.data == b"\x00\x00\x00\x01payload-a\x00\x00\x00\x01payload-b" + + +def test_encoder_emits_encoded_image_metadata_and_periodic_keyframes() -> None: + codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) + encoder = H264Encoder(H264Config(keyframe_interval=2, max_gop_frames=2), codec=codec) + + p0 = encoder.encode(_image()) + p1 = encoder.encode(_image()) + p2 = encoder.encode(_image()) + + assert [p0.codec_metadata["seq"], p1.codec_metadata["seq"], p2.codec_metadata["seq"]] == [ + 0, + 1, + 2, + ] + assert [ + p0.codec_metadata["is_keyframe"], + p1.codec_metadata["is_keyframe"], + p2.codec_metadata["is_keyframe"], + ] == [True, False, True] + assert [ + p0.codec_metadata["keyframe_seq"], + p1.codec_metadata["keyframe_seq"], + p2.codec_metadata["keyframe_seq"], + ] == [0, 0, 2] + assert codec.encoded_force_keyframes == [True, False, True] + assert isinstance(p0.data, bytes) + assert b"\x67" in p0.data and b"\x68" in p0.data + + +def test_gop_buffer_suppresses_delta_after_sequence_gap_until_keyframe() -> None: + codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) + decoder = H264Decoder(codec=codec, gop_buffer=GopBuffer()) + + assert decoder.decode(_encoded(0, key=True)).frame_id == "cam" + assert decoder.decode(_encoded(1, key=False, keyframe_seq=0)).frame_id == "cam" + + with pytest.raises(VideoDecodeGapError): + decoder.decode(_encoded(3, key=False, keyframe_seq=0)) + with pytest.raises(VideoDecodeGapError): + decoder.decode(_encoded(4, key=False, keyframe_seq=0)) + + assert decoder.decode(_encoded(5, key=True)).frame_id == "cam" + assert codec.decoded_sequences == [0, 1, 5] + + +def test_unsupported_image_format_and_dtype_fail_explicitly() -> None: + codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) + encoder = H264Encoder(codec=codec) + + with pytest.raises(UnsupportedVideoImageError, match="RGBA"): + encoder.encode(_image(ImageFormat.RGBA)) + with pytest.raises(UnsupportedVideoImageError, match="uint8"): + encoder.encode(_image(dtype=np.dtype(np.uint16))) + + +def test_missing_aiortc_dependencies_raise_actionable_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + real_import = builtins.__import__ + + def fake_import(name: str, *args: object, **kwargs: object) -> object: + if name == "av" or name.startswith("aiortc"): + raise ImportError(name) + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + + with pytest.raises(MissingVideoDependencyError, match="H.264 image mode requires"): + AiortcH264Codec() diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 24d90dd7fe..7653c33b9d 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -50,6 +50,13 @@ "demo-error-on-name-conflicts": "dimos.robot.unitree.demo_error_on_name_conflicts:demo_error_on_name_conflicts", "demo-google-maps-skill": "dimos.agents.skills.demo_google_maps_skill:demo_google_maps_skill", "demo-gps-nav": "dimos.agents.skills.demo_gps_nav:demo_gps_nav", + "demo-h264-rerun-subscribe": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_rerun_subscribe", + "demo-h264-storage-benchmark": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_storage_benchmark", + "demo-h264-video-e2e": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_video_e2e", + "demo-h264-webcam-publish": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_webcam_publish", + "demo-h264-webcam-record": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_webcam_record", + "demo-h264-webcam-replay": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_webcam_replay", + "demo-h264-webcam-rerun": "dimos.protocol.video.demo_h264_video_e2e:demo_h264_webcam_rerun", "demo-mcp-stress-test": "dimos.core.tests.stress_test_blueprint:demo_mcp_stress_test", "demo-object-scene-registration": "dimos.perception.demo_object_scene_registration:demo_object_scene_registration", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", @@ -102,12 +109,15 @@ "unitree-g1-sim": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_sim:unitree_g1_sim", "unitree-go2": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2:unitree_go2", "unitree-go2-agentic": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic:unitree_go2_agentic", + "unitree-go2-agentic-h264-video": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic_h264_video:unitree_go2_agentic_h264_video", "unitree-go2-agentic-huggingface": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic_huggingface:unitree_go2_agentic_huggingface", "unitree-go2-agentic-ollama": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic_ollama:unitree_go2_agentic_ollama", "unitree-go2-basic": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic:unitree_go2_basic", "unitree-go2-coordinator": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_coordinator:unitree_go2_coordinator", "unitree-go2-detection": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_detection:unitree_go2_detection", "unitree-go2-fleet": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_fleet:unitree_go2_fleet", + "unitree-go2-h264-detection": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_h264_detection:unitree_go2_h264_detection", + "unitree-go2-h264-video": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_h264_video:unitree_go2_h264_video", "unitree-go2-keyboard-teleop": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_keyboard_teleop:unitree_go2_keyboard_teleop", "unitree-go2-markers": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2:unitree_go2_markers", "unitree-go2-memory": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2:unitree_go2_memory", @@ -169,12 +179,20 @@ "gps-nav-skill-container": "dimos.agents.skills.gps_nav_skill.GpsNavSkillContainer", "grasping-module": "dimos.manipulation.grasping.grasping.GraspingModule", "gstreamer-camera-module": "dimos.hardware.sensors.camera.gstreamer.gstreamer_camera.GstreamerCameraModule", + "h264-benchmark-recorder": "dimos.protocol.video.demo_h264_video_e2e.H264BenchmarkRecorder", + "h264-e2-e-recorder": "dimos.protocol.video.demo_h264_video_e2e.H264E2ERecorder", + "h264-memory-replay": "dimos.protocol.video.demo_h264_video_e2e.H264MemoryReplay", + "h264-storage-benchmark-reporter": "dimos.protocol.video.demo_h264_video_e2e.H264StorageBenchmarkReporter", + "h264-storage-benchmark-source": "dimos.protocol.video.demo_h264_video_e2e.H264StorageBenchmarkSource", + "h264-video-probe": "dimos.protocol.video.demo_h264_video_e2e.H264VideoProbe", + "h264-webcam-recorder": "dimos.protocol.video.demo_h264_video_e2e.H264WebcamRecorder", "hosted-arm-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedArmTeleopModule", "hosted-teleop-module": "dimos.teleop.quest_hosted.hosted_teleop_module.HostedTeleopModule", "hosted-teleop-recorder": "dimos.teleop.quest_hosted.blueprints.HostedTeleopRecorder", "hosted-twist-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedTwistTeleopModule", "joint-trajectory-controller": "dimos.manipulation.control.trajectory_controller.joint_trajectory_controller.JointTrajectoryController", "joystick-module": "dimos.robot.unitree.b1.joystick_module.JoystickModule", + "jpeg-benchmark-recorder": "dimos.protocol.video.demo_h264_video_e2e.JpegBenchmarkRecorder", "keyboard-teleop": "dimos.robot.unitree.keyboard_teleop.KeyboardTeleop", "keyboard-teleop-module": "dimos.teleop.keyboard.keyboard_teleop_module.KeyboardTeleopModule", "local-planner": "dimos.navigation.nav_stack.modules.local_planner.local_planner.LocalPlanner", @@ -223,6 +241,7 @@ "simple-planner": "dimos.navigation.nav_stack.modules.simple_planner.simple_planner.SimplePlanner", "spatial-memory": "dimos.perception.spatial_perception.SpatialMemory", "speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill", + "synthetic-video-source": "dimos.protocol.video.demo_h264_video_e2e.SyntheticVideoSource", "tare-planner": "dimos.navigation.nav_stack.modules.tare_planner.tare_planner.TarePlanner", "teleop-recorder": "dimos.teleop.utils.recorder.TeleopRecorder", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory", diff --git a/dimos/robot/drone/camera_module.py b/dimos/robot/drone/camera_module.py index b77c597980..72c77fe2ee 100644 --- a/dimos/robot/drone/camera_module.py +++ b/dimos/robot/drone/camera_module.py @@ -138,7 +138,7 @@ def _processing_loop(self) -> None: self._latest_frame = None # Get numpy array from Image - img_array = frame.data + img_array = frame.require_raw("DroneCameraModule._process_frames") # Create header header = Header(self.camera_frame_id) diff --git a/dimos/robot/drone/drone_tracking_module.py b/dimos/robot/drone/drone_tracking_module.py index 277ecc509e..846faf26fe 100644 --- a/dimos/robot/drone/drone_tracking_module.py +++ b/dimos/robot/drone/drone_tracking_module.py @@ -115,7 +115,7 @@ def _get_latest_frame(self) -> np.ndarray[Any, np.dtype[Any]] | None: if self._latest_frame is None: return None # Convert Image to numpy array - data: np.ndarray[Any, np.dtype[Any]] = self._latest_frame.data + data = self._latest_frame.require_raw("DroneTrackingModule._get_latest_frame") return data @rpc diff --git a/dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_agentic_h264_video.py b/dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_agentic_h264_video.py new file mode 100644 index 0000000000..35e4176765 --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_agentic_h264_video.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Agentic Go2 stack with H.264 transport enabled for the color image stream.""" + +from dimos.agents.mcp.mcp_client import McpClient +from dimos.agents.mcp.mcp_server import McpServer +from dimos.core.coordination.blueprints import autoconnect +from dimos.perception.perceive_loop_skill import PerceiveLoopSkill +from dimos.perception.spatial_perception import SpatialMemory +from dimos.robot.unitree.go2.blueprints.agentic._common_agentic import _common_agentic +from dimos.robot.unitree.go2.blueprints.smart.unitree_go2_h264_video import unitree_go2_h264_video + +unitree_go2_agentic_h264_video = autoconnect( + unitree_go2_h264_video, + SpatialMemory.blueprint(), + PerceiveLoopSkill.blueprint(), + McpServer.blueprint(), + McpClient.blueprint(), + _common_agentic, +).global_config(n_workers=12, robot_model="unitree_go2") + +__all__ = ["unitree_go2_agentic_h264_video"] diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_detection.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_detection.py new file mode 100644 index 0000000000..1d54f4979d --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_detection.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Go2 replay stack for validating H.264 video transport with 3D detection.""" + +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.transport import LCMTransport +from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.msgs.vision_msgs.Detection2DArray import Detection2DArray +from dimos.perception.detection.module3D import Detection3DModule +from dimos.robot.unitree.go2.blueprints.smart.unitree_go2_h264_video import ( + unitree_go2_h264_video, +) +from dimos.robot.unitree.go2.connection import GO2Connection + +unitree_go2_h264_detection = ( + autoconnect( + unitree_go2_h264_video, + Detection3DModule.blueprint( + camera_info=GO2Connection.camera_info_static, + ), + ) + .remappings( + [ + (Detection3DModule, "pointcloud", "global_map"), + ] + ) + .transports( + { + ("detections", Detection3DModule): LCMTransport( + "/detector3d/detections", Detection2DArray + ), + ("detected_pointcloud_0", Detection3DModule): LCMTransport( + "/detector3d/pointcloud/0", PointCloud2 + ), + ("detected_pointcloud_1", Detection3DModule): LCMTransport( + "/detector3d/pointcloud/1", PointCloud2 + ), + ("detected_pointcloud_2", Detection3DModule): LCMTransport( + "/detector3d/pointcloud/2", PointCloud2 + ), + ("detected_image_0", Detection3DModule): LCMTransport("/detector3d/image/0", Image), + ("detected_image_1", Detection3DModule): LCMTransport("/detector3d/image/1", Image), + ("detected_image_2", Detection3DModule): LCMTransport("/detector3d/image/2", Image), + } + ) +) + +__all__ = ["unitree_go2_h264_detection"] diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py new file mode 100644 index 0000000000..d304d3d7cc --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Go2 navigation stack with H.264 transport enabled for the color image stream.""" + +from typing import Any, cast + +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.global_config import global_config +from dimos.core.transport import H264LcmTransport +from dimos.mapping.costmapper import CostMapper +from dimos.mapping.voxels import VoxelGridMapper +from dimos.msgs.sensor_msgs.Image import Image +from dimos.navigation.frontier_exploration.wavefront_frontier_goal_selector import ( + WavefrontFrontierExplorer, +) +from dimos.navigation.movement_manager.movement_manager import MovementManager +from dimos.navigation.patrolling.module import PatrollingModule +from dimos.navigation.replanning_a_star.module import ReplanningAStarPlanner +from dimos.protocol.video.demo_h264_video_e2e import H264VideoProbe +from dimos.protocol.video.h264 import H264Config, H264Decoder, VideoDecodeGapError +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import rerun_config +from dimos.robot.unitree.go2.connection import GO2Connection +from dimos.visualization.vis_module import vis_module + +_go2_h264_config = H264Config( + bitrate=2_000_000, + target_fps=15, + keyframe_interval=30, +) +_go2_rerun_decoder: H264Decoder | None = None + + +def _convert_h264_color_image(image: Image) -> Any: + """Decode H.264 color frames before logging them in Rerun.""" + global _go2_rerun_decoder + + if image.encoding == "h264": + if _go2_rerun_decoder is None: + _go2_rerun_decoder = H264Decoder(_go2_h264_config) + try: + image = _go2_rerun_decoder.decode(image) + except (VideoDecodeGapError, ValueError): + # Replay/subscription can start mid-GOP. Suppress deltas until the + # next keyframe restores decoder state. + return None + return image.to_rerun() + + +_h264_rerun_config = { + **rerun_config, + "visual_override": { + **cast("dict[str, Any]", rerun_config["visual_override"]), + "world/color_image": _convert_h264_color_image, + }, +} + +unitree_go2_h264_video = ( + autoconnect( + vis_module( + viewer_backend=global_config.viewer, + rerun_config=_h264_rerun_config, + ), + GO2Connection.blueprint(), + VoxelGridMapper.blueprint(emit_every=5), + CostMapper.blueprint(), + ReplanningAStarPlanner.blueprint(), + WavefrontFrontierExplorer.blueprint(), + PatrollingModule.blueprint(), + MovementManager.blueprint(), + H264VideoProbe.blueprint(), + ) + .transports( + { + ("color_image", Image): H264LcmTransport( + "/color_image", + Image, + config=_go2_h264_config, + decode_images=True, + ), + } + ) + .global_config(n_workers=11, robot_model="unitree_go2") +) diff --git a/dimos/teleop/quest_hosted/video_track.py b/dimos/teleop/quest_hosted/video_track.py index 2a17c3c39a..07ad3a2846 100644 --- a/dimos/teleop/quest_hosted/video_track.py +++ b/dimos/teleop/quest_hosted/video_track.py @@ -104,7 +104,9 @@ async def recv(self) -> av.VideoFrame: self._first_mono = now pts = int((now - self._first_mono) * VIDEO_CLOCK_RATE) - frame = av.VideoFrame.from_ndarray(img.data, format=_AV_FORMAT_MAP.get(img.format, "bgr24")) + frame = av.VideoFrame.from_ndarray( + img.require_raw("CameraVideoTrack.recv"), format=_AV_FORMAT_MAP.get(img.format, "bgr24") + ) frame.pts = pts frame.time_base = VIDEO_TIME_BASE return frame diff --git a/docs/capabilities/memory/h264_storage_benchmark_report.md b/docs/capabilities/memory/h264_storage_benchmark_report.md new file mode 100644 index 0000000000..39e9c888ca --- /dev/null +++ b/docs/capabilities/memory/h264_storage_benchmark_report.md @@ -0,0 +1,93 @@ +# H.264 memory2 storage benchmark + +This report compares memory2 image storage size for the same frames stored with the default JPEG codec and the opt-in H.264 codec. + +## Method + +Blueprint: `demo-h264-storage-benchmark` + +The benchmark source publishes identical `Image` frames to two recorder streams: + +- `jpeg_image` uses the default memory2 `Image` codec (`JpegCodec`). +- `h264_image` uses `codec="h264"` and receives encoded H.264 images through `H264LcmTransport(decode_images=False)`. + +The reporter measures compact SQLite snapshot sizes with SQLite backup, so active WAL/SHM sidecars do not skew the comparison. + +## Public video sample + +Source video: + +- URL: `https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi` +- Local path used for the run: `/tmp/opencode/dimos-h264-benchmark-vtest.avi` +- File size: 8,131,690 bytes +- Source dimensions: 768 x 576 +- Source FPS: 10 +- Source frame count: 795 + +Benchmark settings: + +- Frames recorded: 150 +- Recorded dimensions: 320 x 240 +- Publish rate: 15 FPS +- H.264 bitrate: 1,500,000 bps +- H.264 keyframe interval: 30 frames +- H.264 profile/preset/tune: baseline / veryfast / zerolatency +- B-frames: disabled + +Command: + +```bash +rm -f benchmark_jpeg.db benchmark_jpeg.db-wal benchmark_jpeg.db-shm \ + benchmark_h264.db benchmark_h264.db-wal benchmark_h264.db-shm + +DIMOS_H264_BENCHMARK_VIDEO=/tmp/opencode/dimos-h264-benchmark-vtest.avi \ + uv run dimos run demo-h264-storage-benchmark --daemon + +sleep 22 +uv run dimos log -n 80 +uv run dimos stop +``` + +## Result + +| Codec | DB path | Rows | Blob rows | Blob bytes | DB size | +|---|---:|---:|---:|---:|---:| +| JPEG | `benchmark_jpeg.db` | 150 | 150 | 1,586,940 | 1,884,160 bytes (1.80 MiB) | +| H.264 | `benchmark_h264.db` | 150 | 150 | 1,008,355 | 1,126,400 bytes (1.07 MiB) | + +H.264 used 59.8% of the JPEG storage size and saved 757,760 bytes, a 40.2% reduction for this sample. + +## Direct ffmpeg H.264 comparison + +To estimate the cost of per-frame Foxglove-style storage versus a continuous H.264 stream, the same 150 frames were encoded directly with ffmpeg using similar H.264 settings: + +```bash +ffmpeg -y -v error \ + -i /tmp/opencode/dimos-h264-benchmark-vtest.avi \ + -vf "scale=320:240,fps=15" \ + -frames:v 150 \ + -c:v libx264 \ + -b:v 1500k -maxrate 1500k -bufsize 3000k \ + -profile:v baseline -preset veryfast -tune zerolatency \ + -g 30 -keyint_min 30 -sc_threshold 0 -bf 0 \ + -pix_fmt yuv420p \ + -f h264 /tmp/opencode/dimos-h264-benchmark-direct.h264 +``` + +| Output | Size | +|---|---:| +| Direct ffmpeg Annex B H.264 elementary stream | 1,603,706 bytes (1.53 MiB) | +| Direct ffmpeg MP4 container | 1,606,038 bytes (1.53 MiB) | +| memory2 H.264 SQLite DB | 1,126,400 bytes (1.07 MiB) | +| memory2 H.264 blob payloads only | 1,008,355 bytes (0.96 MiB) | + +In this run, memory2 H.264 storage was smaller than the direct ffmpeg elementary stream. That means this benchmark does not show a storage-efficiency penalty from the per-frame Annex B access-unit layout. It mostly shows that the current aiortc/libx264 path and the direct ffmpeg command did not produce identical rate-control output, even with similar nominal settings. + +The storage overhead within memory2 was measurable: the H.264 DB was 118,045 bytes larger than its stored blob payloads, or 11.7% over the blob bytes. That overhead includes observation metadata, SQLite page overhead, and one encoded-image envelope per frame. + +## Notes + +- The benchmark measures SQLite DB size, not raw compressed frame bytes alone. Observation metadata and blob table overhead are included for both codecs. +- The direct ffmpeg comparison is not a quality-matched encoder benchmark. It uses similar nominal settings to the DimOS H.264 config, but aiortc/PyAV and ffmpeg rate control can still choose different actual bit allocation. +- The sample video already contains temporal structure. Synthetic frames from the same benchmark blueprint produced a larger reduction in one local run: JPEG 2,109,440 bytes, H.264 983,040 bytes, a 53.4% reduction. +- H.264 results depend on bitrate, keyframe interval, resolution, motion, and scene texture. diff --git a/docs/capabilities/memory/index.md b/docs/capabilities/memory/index.md index c2fd3a35be..3641b45c33 100644 --- a/docs/capabilities/memory/index.md +++ b/docs/capabilities/memory/index.md @@ -206,3 +206,75 @@ plot_mosaic(matches.map(lambda obs: obs.data).to_list(), "assets/grid.png") ``` ![output](assets/grid.png) + +## H.264 image storage + +memory2 stores `Image` streams with the default JPEG image codec unless a stream +opts into H.264. Use H.264 storage for high-rate camera streams when disk usage +matters and frame-to-frame compression is worth the dependency cost. + +```python skip +from dimos.memory2.store.sqlite import SqliteStore +from dimos.msgs.sensor_msgs.Image import Image + +store = SqliteStore(path="robot_video.db") +color = store.stream( + "color_image", + Image, + codec="h264", +) +``` + +Recorder modules that need H.264 storage should create their target stream with +the same codec override: + +```python skip +from dimos.core.stream import In +from dimos.memory2.module import Recorder +from dimos.msgs.sensor_msgs.Image import Image + +class H264Recorder(Recorder): + color_image: In[Image] + + def start(self) -> None: + stream = self.store.stream("color_image", Image, codec="h264") + self._port_to_stream("color_image", self.color_image, stream) +``` + +H.264 storage keeps the normal memory2 shape: one observation row per source +frame. The blob for that observation stores one encoded `Image` whose data is a +complete H.264 Annex B access unit, not individual RTP fragments. H.264 frame +metadata lives in `Image.codec_metadata`. + +Metadata queries do not decode pixels. You can inspect timestamps, poses, tags, +frame ids, `Image.encoding`, and H.264 codec metadata without paying decode +cost. Accessing `obs.data` returns an encoded `Image` for H.264 streams. Use an +explicit H.264 decode session to convert replayed encoded images to raw pixel +images; that decoder suppresses deltas until the first keyframe at or after the +replay start point. + +H.264 storage currently supports uint8 RGB, BGR, and grayscale images. It raises +an explicit error for depth images, 16-bit images, alpha formats, and other +unsupported pixel layouts. The default `store.stream("color_image", Image)` path +continues to use JPEG. + +### Synthetic H.264 QA blueprint + +The `demo-h264-video-e2e` blueprint exercises both live H.264 LCM transport and +H.264 memory2 storage without a robot or physical camera: + +```bash skip +dimos run demo-h264-video-e2e --daemon +dimos log -f +``` + +The blueprint publishes deterministic synthetic RGB frames, records them to +`h264_video_e2e.db`, and runs a probe that logs decoded-frame count, dimensions, +timestamp monotonicity, frame id stability, and validation failures. Use it after +codec or storage changes to inspect: + +- logs from the source, recorder, and probe; +- memory2 metadata queries that do not touch `obs.data`; +- lazy `obs.data` decode after a valid keyframe, with best-effort suppression of undecodable deltas; +- replay of the recorded stream; and +- sequence-gap behavior, if you inject packet loss in the transport tests. diff --git a/docs/coding-agents/style.md b/docs/coding-agents/style.md index 3e13faae9b..657399b40c 100644 --- a/docs/coding-agents/style.md +++ b/docs/coding-agents/style.md @@ -49,3 +49,16 @@ from dimos.memory2.stream import Stream from dimos.memory2.store.base import Store from dimos.memory2.stream import Stream ``` + +## H.264 encoded Image shape + +When editing H.264 image transport or memory2 storage, keep the public module +contract as `Out[Image]` and `In[Image]`. Do not expose RTP fragments to module +authors or memory2 observations. + +For LCM, DDS, and memory2 storage, each encoded `Image` must contain all H.264 +NAL units for exactly one source frame as one Annex B access unit, with H.264 +frame state in `Image.codec_metadata`. Store one memory2 observation per source +frame. P-frames still depend on earlier GOP state, so decode from a valid +keyframe and suppress output after sequence gaps, late join, or replay seek until +the next keyframe. diff --git a/docs/development/testing.md b/docs/development/testing.md index 40e429797c..306a6403b7 100644 --- a/docs/development/testing.md +++ b/docs/development/testing.md @@ -63,6 +63,37 @@ When writing or debugging a specific self-hosted test, override `-m` yourself to pytest -m self_hosted dimos/path/to/test_something.py ``` +### H.264 image transport and storage tests + +The H.264 unit tests use fake codec adapters where possible, so they run in the +default suite without requiring FFmpeg/libx264. Run the focused tests after +changing encoded `Image` shape, eager/raw `Image` compatibility, H.264 transport, +memory2 storage, or the demo blueprint: + +```bash +uv run pytest dimos/protocol/video/test_h264.py dimos/msgs/sensor_msgs/test_image.py -q +uv run pytest dimos/protocol/pubsub/impl/test_h264_lcm.py dimos/protocol/pubsub/test_registry.py -q +uv run pytest dimos/memory2/video/test_h264_storage.py -q +CI=1 uv run pytest dimos/robot/test_all_blueprints_generation.py -q +``` + +The runtime H.264 path uses `aiortc`, PyAV, FFmpeg, and libx264. If a test or +manual run instantiates the real codec and those dependencies are missing, H.264 +should fail with an actionable dependency error. Keep fake-adapter unit tests in +place so the default suite still covers encoded-image semantics, GOP handling, +and memory2 behavior. + +When you add or rename a runnable demo blueprint, regenerate +`dimos/robot/all_blueprints.py` with: + +```bash +uv run pytest dimos/robot/test_all_blueprints_generation.py +``` + +Locally, that command may update `all_blueprints.py` and then fail to remind you +to commit the generated file. Re-run it with `CI=1` after the file is current to +verify generation is clean. + ## Testing on a fresh Ubuntu install CI tests dimos with pre-built images and cached deps, so it can't catch gaps diff --git a/docs/usage/blueprints.md b/docs/usage/blueprints.md index bceb356cd7..237ade1e14 100644 --- a/docs/usage/blueprints.md +++ b/docs/usage/blueprints.md @@ -163,6 +163,28 @@ base_blueprint = base_blueprint.transports({ Note: `expanded_blueprint` does not get the transport overrides because it's created from the initial value of `base_blueprint`, not the second. +For compressed camera streams, opt into H.264 on the image edge while keeping the +module stream type as `Image`: + +```python skip +from dimos.core.transport import H264LcmTransport +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.video.h264 import H264Config + +camera_blueprint = camera_blueprint.transports( + { + ("color_image", Image): H264LcmTransport( + "/camera/color_image", + Image, + config=H264Config(bitrate=2_000_000, keyframe_interval=30), + ) + } +) +``` + +`H264LcmTransport` publishes one complete H.264 Annex B packet per source frame. +Downstream modules still receive decoded `Image` values. + ## Remapping connections Sometimes you need to rename a connection to match what other modules expect. You can use `remappings` to rename module connections: diff --git a/docs/usage/transports/index.md b/docs/usage/transports/index.md index 5415aab1d0..062096716e 100644 --- a/docs/usage/transports/index.md +++ b/docs/usage/transports/index.md @@ -114,6 +114,60 @@ ros = nav.transports( ) ``` +### H.264 image transport + +Use `H264LcmTransport` when a high-rate `Image` stream needs video compression +over LCM. The module API stays the same: publishers still call +`Out[Image].publish(image)`, and subscribers still receive `Image` values. The +transport encodes each source frame as one H.264 Annex B access unit on the wire +and decodes it at the subscriber by default. Set `decode_images=False` when a +subscriber, such as a recorder, should receive encoded `Image` values instead. + +```python skip +from dimos.core.transport import H264LcmTransport +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.video.h264 import H264Config + +blueprint = blueprint.transports( + { + ("color_image", Image): H264LcmTransport( + "/camera/color_image", + Image, + config=H264Config( + bitrate=2_000_000, + target_fps=30, + keyframe_interval=30, + ), + ) + } +) +``` + +Encoded delivery uses the same public `Image` type but sets +`image.encoding == "h264"`, stores the Annex B payload in `image.data`, and stores +sequence/keyframe metadata in `image.codec_metadata`. Raw-pixel methods raise for +encoded images; decode them through a H.264 decode session first. + +H.264 transport is opt-in. The default image paths remain unchanged: normal LCM +uses the `Image` LCM encoding, and memory2 still stores images with the default +JPEG codec unless configured otherwise. + +H.264 is stateful. Keyframes bootstrap late subscribers and recovery after packet +loss. If an LCM subscriber detects a sequence gap in the middle of a GOP, it +suppresses decoded output until the next keyframe. Keyframes include decoder +parameter data, such as SPS/PPS, so a new subscriber can start decoding at a +keyframe. + +LCM H.264 is best-effort in v1. DimOS does not yet provide transport QoS, +durable keyframe cache, keyframe request, or PLI behavior for LCM. Those belong +in a later QoS/video-session design. + +The first H.264 image path supports uint8 RGB, BGR, and grayscale images. It +raises an explicit error for depth, 16-bit, alpha, or other unsupported image +formats instead of silently converting pixels. Selecting H.264 requires the video +dependencies used by `aiortc`, PyAV, FFmpeg, and libx264; projects that do not +select H.264 do not need those dependencies at runtime. + --- ## Using transports with modules @@ -473,6 +527,7 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b | `Memory` | Testing only, single process | No | No | Minimal reference impl | | `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | | `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN | +| `H264LCM` | Opt-in compressed `Image` streams | Yes | Yes | H.264 Annex B access units over LCM | | `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | | `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | | `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP |