Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,38 @@ def stop(self) -> None:
self._started = False


class H264LcmTransport(LCMTransport): # type: ignore[type-arg]
def __init__(
self,
topic: str,
type: type,
config: Any | None = None,
decode_images: bool = True,
**kwargs: Any,
) -> None: # type: ignore[no-untyped-def]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current transports have type: ignore for legacy reasons. You shouldn't copy that. Use proper types.

from dimos.protocol.pubsub.impl.h264_lcm import H264LCM
from dimos.protocol.video.h264 import H264Config

self.config = config or H264Config()
self.decode_images = decode_images
self.lcm = H264LCM(config=self.config, decode_images=decode_images, **kwargs) # type: ignore[assignment]
super().__init__(topic, type)

def __reduce__(self): # type: ignore[no-untyped-def]
return (
H264LcmTransport,
(self.topic.topic, self.topic.lcm_type, self.config, self.decode_images),
)

def start(self) -> None:
self.lcm.start()
self._started = True

def stop(self) -> None:
self.lcm.stop()
self._started = False


class pSHMTransport(PubSubTransport[T]):
_started: bool = False

Expand Down
2 changes: 1 addition & 1 deletion dimos/experimental/security_demo/depth_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _loop(self) -> None:

def _process(self, image: Image) -> None:
rgb = image.to_rgb()
pil_image = PILImage.fromarray(rgb.data)
pil_image = PILImage.fromarray(rgb.require_raw("DepthEstimator._process"))
if pil_image.width > _DEPTH_MAX_WIDTH:
scale = _DEPTH_MAX_WIDTH / pil_image.width
new_h = int(pil_image.height * scale)
Expand Down
4 changes: 2 additions & 2 deletions dimos/experimental/security_demo/security_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def _patrol_step(self) -> None:
)

annotated = draw_bounding_box(
image.data.copy(),
image.require_raw("SecurityModule._detection_step").copy(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change image type? doesn't this break now if using diff transport? you should transport the actual type modules require, feel free to use some shim that reconstructs the image on .data access

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomCC7 I think it's better to leave Image as is. If you want to have an encoded image message, can't you create a new message type? I think this change complicates the current Image class too much. We had this in the past as well where Image supported CUDA storage and the Image was overly complex.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For module-to-module transport cases, since all encode-decode are handled within transport layer, the image shouldn't require an overload. This addition was required to store encoded videoframe directly into recorder so that we enjoy smaller file size. And since we are doing encoding in transport layer, the message type can't be changed in the current design.

The other possible choice is to add a dedicated module to convert type, or directly modify recorder's storage process to encode directly. But then we'll need to duplicate the encode logic into separate places.

@TomCC7 TomCC7 Jun 16, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paul-nechifor @leshy created a new pr that patches to this pr with a new design. Can check if that's the right direction: #2504. It basically mimic the current way of handling jpeg for transport/storage.

Generally I would say the core conflict here is that we are handling two usecases - a new transport that leverage video as a more efficient encoding during transportation and a new storage format for recorder - both need compression but they are in different layer (so we have to somewhat duplicate the encode/decode logic in two places).

One possible future cleanup I mentioned to @leshy before would be to reposition the recorder closer to the transport/import layer, so it can naturally record the already-encoded transport payload instead of receiving decoded module messages and encoding again for storage.

That would look more like:

             +--------------------+
             | recorder/storage   |
             | stores encoded pkt |
             +---------^----------+
                       |
Module Out[Image] -> encode once -> wire packet -> decode -> Module In[Image]

That may be a cleaner long-term architecture because transport and recording could share the same encoded packet path directly. But it is a larger architectural change. This patch keeps the existing module/recorder model and follows the repo’s current JPEG-style layering while avoiding encoded Image semantics.

list(best.bbox),
label=best.name,
confidence=best.confidence,
Expand Down Expand Up @@ -340,7 +340,7 @@ def _follow_step(self) -> None:
twist = self._visual_servo.compute_twist(best.bbox, latest_image.width)
self.cmd_vel.publish(twist)

overlay = latest_image.data.copy()
overlay = latest_image.require_raw("SecurityModule._follow_step").copy()
if hasattr(best, "mask") and best.mask is not None:
mask_bool = best.mask > 0
green = np.zeros_like(overlay)
Expand Down
2 changes: 1 addition & 1 deletion dimos/mapping/occupancy/visualize_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def visualize_path(
scale: int = 8,
) -> Image:
image = visualize_occupancy_grid(occupancy_grid, "rainbow")
bgr = image.data
bgr = image.require_raw("visualize_path")

bgr = cv2.resize(
bgr,
Expand Down
5 changes: 3 additions & 2 deletions dimos/mapping/osm/current_location_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def _fetch_new_map(self) -> None:

assert self._map_image is not None
assert self._position is not None
pil_image = PILImage.fromarray(self._map_image.image.data)
map_data = self._map_image.image.require_raw("CurrentLocationMap._fetch_new_map")
pil_image = PILImage.fromarray(map_data)
draw = ImageDraw.Draw(pil_image)
x, y = self._map_image.latlon_to_pixel(self._position)
radius = 20
Expand All @@ -85,7 +86,7 @@ def _fetch_new_map(self) -> None:
width=3,
)

self._map_image.image.data[:] = np.array(pil_image)
map_data[:] = np.array(pil_image)

def _position_is_too_far_off_center(self) -> bool:
x, y = self._map_image.latlon_to_pixel(self._position) # type: ignore[arg-type, union-attr]
Expand Down
9 changes: 9 additions & 0 deletions dimos/memory2/codecs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def codec_from_id(codec_id_str: str, payload_module: str) -> Codec[Any]:


def _class_to_id(codec: Any) -> str:
explicit_id = getattr(codec, "CODEC_ID", None)
if explicit_id is not None:
if not isinstance(explicit_id, str):
raise TypeError(f"Codec CODEC_ID must be str, got {type(explicit_id).__name__}")
return explicit_id
name = type(codec).__name__
if name.endswith("Codec"):
return name[:-5].lower()
Expand All @@ -101,6 +106,10 @@ def _make_one(name: str, payload_module: str, inner: Codec[Any] | None = None) -
from dimos.memory2.codecs.jpeg import JpegCodec

return JpegCodec()
if name == "h264":
from dimos.memory2.video.h264 import H264ImageCodec

return H264ImageCodec()
if name == "lcm":
from dimos.memory2.codecs.lcm import LcmCodec

Expand Down
51 changes: 51 additions & 0 deletions dimos/memory2/video/h264.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image


class H264ImageCodec:
"""memory2 codec for already-H.264 encoded Image payloads.

This codec deliberately does not decode pixels. It persists an ``Image`` whose
``encoding`` is ``"h264"`` and restores the same encoded image on read. A
separate H.264 decode session turns the encoded stream back into raw Images
for visualization or module consumption.
"""

CODEC_ID = "h264"

def encode(self, value: Image) -> bytes:
if value.encoding != H264_IMAGE_ENCODING:
raise ValueError(
f"H264ImageCodec stores encoded Images; got encoding={value.encoding!r}"
)
return value.lcm_encode()

def decode(self, data: bytes) -> Image:
image = Image.lcm_decode(data)
if image.encoding != H264_IMAGE_ENCODING:
raise ValueError(
f"H264ImageCodec expected encoded Image; got encoding={image.encoding!r}"
)
return image


def is_h264_image(image: Image) -> bool:
return image.encoding == H264_IMAGE_ENCODING


__all__ = ["H264ImageCodec", "is_h264_image"]
138 changes: 138 additions & 0 deletions dimos/memory2/video/test_h264_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from pathlib import Path
import platform

import numpy as np
import pytest

from dimos.memory2.backend import Backend
from dimos.memory2.codecs.base import codec_from_id, codec_id
from dimos.memory2.codecs.jpeg import JpegCodec
from dimos.memory2.store.sqlite import SqliteStore
from dimos.memory2.video.h264 import H264ImageCodec
from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat

_SKIP_SQLITE_VEC = platform.machine() == "aarch64" or platform.system() == "Darwin"


def _raw_image(seq: int, fmt: ImageFormat = ImageFormat.RGB) -> Image:
data = np.full((2, 2, 3), seq, dtype=np.uint8)
if fmt == ImageFormat.GRAY:
data = np.full((2, 2), seq, dtype=np.uint8)
return Image.from_numpy(data, format=fmt, frame_id="cam", ts=float(seq))


def _encoded_image(seq: int, *, key: bool = True) -> Image:
return Image.encoded(
data=b"\x00\x00\x00\x01\x65" + bytes([seq]),
encoding=H264_IMAGE_ENCODING,
format=ImageFormat.RGB,
frame_id="cam",
ts=float(seq),
codec_metadata={
"seq": seq,
"codec": "h264",
"bitstream": "annex_b",
"is_keyframe": key,
"keyframe_seq": seq if key else 0,
"pts": seq * 90,
"width": 2,
"height": 2,
"channels": 3,
"dtype": "uint8",
},
)


def test_h264_image_codec_roundtrips_encoded_image() -> None:
codec = H264ImageCodec()
image = _encoded_image(1)

decoded = codec.decode(codec.encode(image))

assert decoded == image
assert decoded.encoding == H264_IMAGE_ENCODING
assert decoded.codec_metadata["seq"] == 1
assert decoded.width == 2
assert decoded.height == 2


def test_h264_image_codec_rejects_raw_images() -> None:
codec = H264ImageCodec()

with pytest.raises(ValueError, match="encoded Images"):
codec.encode(_raw_image(1))


def test_codec_id_and_factory_support_h264_for_image() -> None:
codec = H264ImageCodec()

assert codec_id(codec) == "h264"
assert isinstance(codec_from_id("h264", "dimos.msgs.sensor_msgs.Image.Image"), H264ImageCodec)


def test_h264_stream_stores_encoded_images_with_normal_backend(tmp_path: Path) -> None:
if _SKIP_SQLITE_VEC:
pytest.skip("sqlite-vec extension not loadable here")
db = tmp_path / "h264.db"
with SqliteStore(path=str(db)) as store:
stream = store.stream("cam", Image, codec="h264")
stored = stream.append(_encoded_image(1), ts=1.0)
assert stored.data.encoding == H264_IMAGE_ENCODING
assert stored.data.codec_metadata["seq"] == 1

with SqliteStore(path=str(db), must_exist=True) as reopened:
stream = reopened.stream("cam", Image)
obs = stream.first()
assert obs.data.encoding == H264_IMAGE_ENCODING
assert obs.data.codec_metadata["seq"] == 1
assert obs.data.width == 2


def test_h264_replay_emits_encoded_images(tmp_path: Path) -> None:
if _SKIP_SQLITE_VEC:
pytest.skip("sqlite-vec extension not loadable here")
store = SqliteStore(path=str(tmp_path / "replay.db"))
stream = store.stream("cam", Image, codec="h264")
stream.append(_encoded_image(1), ts=1.0)
stream.append(_encoded_image(2, key=False), ts=2.0)

replayed = list(store.replay().streams.cam.iterate())

assert [image.encoding for image in replayed] == [H264_IMAGE_ENCODING, H264_IMAGE_ENCODING]
assert [image.codec_metadata["seq"] for image in replayed] == [1, 2]


def test_default_image_stream_still_uses_jpeg_codec(tmp_path: Path) -> None:
if _SKIP_SQLITE_VEC:
pytest.skip("sqlite-vec extension not loadable here")
store = SqliteStore(path=str(tmp_path / "jpeg.db"))
stream = store.stream("rgb", Image)

source = stream._source
assert isinstance(source, Backend)
assert isinstance(source.codec, JpegCodec)


def test_encoded_images_reject_pixel_operations() -> None:
image = _encoded_image(1)

with pytest.raises(ValueError, match="requires raw Image data"):
image.to_rgb()
with pytest.raises(ValueError, match="requires raw Image data"):
image.as_numpy()
2 changes: 1 addition & 1 deletion dimos/memory2/vis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def mosaic(
canvas = np.zeros((rows * cell_height, cols * cell_w, 3), dtype=np.uint8)
for i, img in enumerate(images):
r, c = divmod(i, cols)
tile = cv2.resize(img.to_bgr().data, (cell_w, cell_height))
tile = cv2.resize(img.to_bgr().require_raw("mosaic_observations"), (cell_w, cell_height))
canvas[r * cell_height : (r + 1) * cell_height, c * cell_w : (c + 1) * cell_w] = tile

result = Image(data=canvas, format=ImageFormat.BGR)
Expand Down
7 changes: 5 additions & 2 deletions dimos/models/vl/florence.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def caption(self, image: Image, detail: str | CaptionDetail | None = None) -> st
task_prompt = CaptionDetail.from_str(detail).value

# Convert to PIL
pil_image = PILImage.fromarray(image.to_rgb().data)
pil_image = PILImage.fromarray(image.to_rgb().require_raw("Florence2Model.caption"))

# Process inputs
inputs = self._processor(text=task_prompt, images=pil_image, return_tensors="pt")
Expand Down Expand Up @@ -137,7 +137,10 @@ def caption_batch(self, *images: Image) -> list[str]:
task_prompt = self._task_prompt

# Convert all to PIL
pil_images = [PILImage.fromarray(img.to_rgb().data) for img in images]
pil_images = [
PILImage.fromarray(img.to_rgb().require_raw("Florence2Model.caption_batch"))
for img in images
]

# Process batch
inputs = self._processor(
Expand Down
2 changes: 1 addition & 1 deletion dimos/models/vl/moondream.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _to_pil(self, image: Image | np.ndarray[Any, Any]) -> PILImage.Image:

image, _ = self._prepare_image(image)
rgb_image = image.to_rgb()
return PILImage.fromarray(rgb_image.data)
return PILImage.fromarray(rgb_image.require_raw("MoondreamVlModel._to_pil"))

def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: # type: ignore[no-untyped-def]
pil_image = self._to_pil(image)
Expand Down
2 changes: 1 addition & 1 deletion dimos/models/vl/moondream_hosted.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _to_pil_image(self, image: Image | np.ndarray) -> PILImage.Image:
image = Image.from_numpy(image)

rgb_image = image.to_rgb()
return PILImage.fromarray(rgb_image.data)
return PILImage.fromarray(rgb_image.require_raw("MoondreamHostedVlModel._to_pil_image"))

def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: # type: ignore[no-untyped-def]
pil_image = self._to_pil_image(image)
Expand Down
Loading
Loading