diff --git a/dimos/learning/collection/blueprint.py b/dimos/learning/collection/blueprint.py new file mode 100644 index 0000000000..27a96be586 --- /dev/null +++ b/dimos/learning/collection/blueprint.py @@ -0,0 +1,73 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Recording blueprints. + +`CollectionRecorder` (a memory2 Recorder) captures the obs/action/status +streams to a SQLite session DB during the run and flushes it durably on +shutdown. DataPrep reads that DB afterwards. +""" + +from __future__ import annotations + +from datetime import datetime + +from dimos.core.coordination.blueprints import Blueprint, autoconnect +from dimos.core.global_config import global_config +from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera +from dimos.learning.collection.episode_monitor import EpisodeMonitorModule +from dimos.learning.collection.recorder import CollectionRecorder +from dimos.teleop.quest.blueprints import ( + teleop_quest_piper, + teleop_quest_xarm7, +) + + +def _session_db(robot: str) -> str: + """Timestamped session DB path, namespaced by robot.""" + return f"data/recordings/session_{robot}_{datetime.now():%Y%m%d_%H%M%S}.db" + + +def _camera_if_real() -> tuple[Blueprint, ...]: + """Real RealSense only off-sim. In `--simulation` the teleop coordinator's + MujocoSimModule already publishes color_image on /camera/color_image, so a + real camera would be redundant (and fail with no device connected).""" + if global_config.simulation: + return () + return (RealSenseCamera.blueprint(enable_pointcloud=False),) + + +# buttons / color_image / joint_state / status are left to autoconnect — each +# name is unique across the composed blueprint, so it resolves to a stable +# / topic shared by producer and recorder. +learning_collect_quest_xarm7 = autoconnect( + teleop_quest_xarm7, + *_camera_if_real(), + EpisodeMonitorModule.blueprint(), # default button_map: toggle=B, discard=Y + CollectionRecorder.blueprint(db_path=_session_db("xarm7")), +) + + +learning_collect_quest_piper = autoconnect( + teleop_quest_piper, + *_camera_if_real(), + EpisodeMonitorModule.blueprint(), # default button_map: toggle=B, discard=Y + CollectionRecorder.blueprint(db_path=_session_db("piper")), +) + + +__all__ = [ + "learning_collect_quest_piper", + "learning_collect_quest_xarm7", +] diff --git a/dimos/learning/collection/episode_monitor.py b/dimos/learning/collection/episode_monitor.py new file mode 100644 index 0000000000..0d194ea427 --- /dev/null +++ b/dimos/learning/collection/episode_monitor.py @@ -0,0 +1,215 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Single point of teleop-input → EpisodeStatus translation. + +Watches buttons / keyboard, runs the start/save/discard state machine, +publishes EpisodeStatus on every transition. RecordReplay (or whatever +records the bus) captures that stream into session.db; DataPrep reads +only the recorded EpisodeStatus events offline — never raw buttons or +keypresses. +""" + +from __future__ import annotations + +import threading +import time +from typing import Any, Literal + +from pydantic import BaseModel +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.teleop.quest.quest_types import BUTTON_ALIASES, Buttons +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class EpisodeStatus(BaseModel): + ts: float + state: Literal["idle", "recording"] + episodes_saved: int + episodes_discarded: int + last_event: Literal["start", "save", "discard", "init"] = "init" + task_label: str | None = None + + +class KeyPress(BaseModel): + """Single keypress event from a keyboard input source.""" + + key: str + ts: float + + +class EpisodeMonitorModuleConfig(ModuleConfig): + button_map: dict[Literal["start", "save", "discard", "toggle"], str] = { + "toggle": "B", + "discard": "Y", + } + keyboard_map: dict[Literal["start", "save", "discard", "toggle"], str] = {} + default_task_label: str | None = None + + +class EpisodeMonitorModule(Module): + config: EpisodeMonitorModuleConfig + + buttons: In[Buttons] + # TODO: no KeyPress producer exists yet — add a pygame keyboard module that + # publishes KeyPress so this port is actually fed (today only buttons drive it). + keyboard: In[KeyPress] + status: Out[EpisodeStatus] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._state: Literal["idle", "recording"] = "idle" + self._saved: int = 0 + self._discarded: int = 0 + self._last_event: Literal["start", "save", "discard", "init"] = "init" + self._prev_bits: dict[str, bool] = {} # rising-edge detection for buttons + self._lock = threading.Lock() + + @rpc + def start(self) -> None: + super().start() + # Registered so the base Module.stop() disposes them on shutdown. + self.register_disposable(Disposable(self.buttons.subscribe(self._on_buttons))) + self.register_disposable(Disposable(self.keyboard.subscribe(self._on_keyboard))) + # Emit an initial idle status so subscribers (and recorders) have a + # known starting point in the timeline. + with self._lock: + status = self._snapshot("init", time.time()) + self._emit(status) + + @rpc + def stop(self) -> None: + super().stop() + + @rpc + def reset_counters(self) -> EpisodeStatus: + with self._lock: + self._state = "idle" + self._saved = 0 + self._discarded = 0 + self._prev_bits = {} + status = self._snapshot("init", time.time()) + return self._emit(status) + + @rpc + def get_status(self) -> EpisodeStatus: + with self._lock: + return EpisodeStatus( + ts=time.time(), + state=self._state, + episodes_saved=self._saved, + episodes_discarded=self._discarded, + last_event=self._last_event, + task_label=self.config.default_task_label, + ) + + # ── port handlers ──────────────────────────────────────────────────────── + + def _on_buttons(self, msg: Buttons) -> None: + """Rising-edge detect against `config.button_map`; advance state machine.""" + ts = time.time() + # Edge-detect under the lock (it shares `_prev_bits` with reset_counters), + # then fire transitions outside it — `_transition` takes the same lock. + fired: list[Literal["start", "save", "discard", "toggle"]] = [] + with self._lock: + for event_name, alias_or_attr in self.config.button_map.items(): + attr = BUTTON_ALIASES.get(alias_or_attr, alias_or_attr) + try: + pressed = bool(getattr(msg, attr)) + except AttributeError: + continue + prev = self._prev_bits.get(attr, False) + self._prev_bits[attr] = pressed + if pressed and not prev: # rising edge + fired.append(event_name) + for event_name in fired: + self._transition(event_name, ts) + + def _on_keyboard(self, msg: KeyPress) -> None: + """Match `msg.key` against `config.keyboard_map`; advance state machine.""" + for event_name, key in self.config.keyboard_map.items(): + if msg.key == key: + self._transition(event_name, msg.ts) + break + + def _transition(self, event: Literal["start", "save", "discard", "toggle"], ts: float) -> None: + """State-machine transition. Publishes EpisodeStatus on every change. + + ``toggle`` resolves to ``start`` when idle and ``save`` when recording, + so one button can begin and end a take. The resolved event is what gets + published (DataPrep only ever sees start/save/discard). + """ + with self._lock: + if event == "toggle": + event = "save" if self._state == "recording" else "start" + if event == "start": + # Auto-commit any in-progress episode (matches DataPrep extractor). + if self._state == "recording": + self._saved += 1 + self._state = "recording" + elif event == "save": + if self._state == "recording": + self._saved += 1 + self._state = "idle" + elif event == "discard": + if self._state == "recording": + self._discarded += 1 + self._state = "idle" + # Snapshot under the mutation's lock so the event matches the state. + status = self._snapshot(event, ts) + self._emit(status) + + def _snapshot( + self, last_event: Literal["start", "save", "discard", "init"], ts: float + ) -> EpisodeStatus: + """Build a status from current state. Caller must hold `self._lock`.""" + self._last_event = last_event + return EpisodeStatus( + ts=ts, + state=self._state, + episodes_saved=self._saved, + episodes_discarded=self._discarded, + last_event=last_event, + task_label=self.config.default_task_label, + ) + + def _emit(self, status: EpisodeStatus) -> EpisodeStatus: + """Publish + log a snapshot. Must run outside the lock (does I/O).""" + self.status.publish(status) + self._log_status(status) + return status + + def _log_status(self, status: EpisodeStatus) -> None: + """One-line operator feedback to the terminal on every transition.""" + verb = { + "start": "▶ RECORDING episode", + "save": "✓ SAVED episode", + "discard": "✗ DISCARDED episode", + "init": "· ready", + }.get(status.last_event, status.last_event) + label = f" [{status.task_label}]" if status.task_label else "" + logger.info( + "[collect] %s%s (state=%s saved=%d discarded=%d)", + verb, + label, + status.state, + status.episodes_saved, + status.episodes_discarded, + ) diff --git a/dimos/learning/collection/recorder.py b/dimos/learning/collection/recorder.py new file mode 100644 index 0000000000..3e205074db --- /dev/null +++ b/dimos/learning/collection/recorder.py @@ -0,0 +1,51 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CollectionRecorder — captures teleop collection streams to a memory2 DB. + +A `Recorder` (memory2) subscribes each declared `In` port and appends every +message to a SQLite store, flushing durably on stop(). Only *connected* +streams are recorded, so the same recorder works for any arm whose +coordinator publishes `joint_state`. + +The recorded stream names match what DataPrep reads: `color_image` +and `joint_state` (observation), `status` (episode segmentation). +""" + +from __future__ import annotations + +from pathlib import Path + +from dimos.core.stream import In +from dimos.learning.collection.episode_monitor import EpisodeStatus +from dimos.memory2.module import Recorder, RecorderConfig +from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.JointState import JointState + + +class CollectionRecorderConfig(RecorderConfig): + db_path: str | Path = "data/recordings/session.db" + + +class CollectionRecorder(Recorder): + """Records the streams DataPrep consumes from a teleop session.""" + + config: CollectionRecorderConfig + + color_image: In[Image] # observation (camera) + joint_state: In[JointState] # observation + action (measured/next state) + status: In[EpisodeStatus] # episode start/save/discard segmentation + + +__all__ = ["CollectionRecorder", "CollectionRecorderConfig"] diff --git a/dimos/learning/collection/test_episode_monitor.py b/dimos/learning/collection/test_episode_monitor.py new file mode 100644 index 0000000000..b0a7307714 --- /dev/null +++ b/dimos/learning/collection/test_episode_monitor.py @@ -0,0 +1,167 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the EpisodeMonitor state machine. + +The module is constructed normally; only its boot side effects (the asyncio +loop + RPC transport that `Module.__init__` starts) are patched out, and its +`status` Out port is replaced with a mock so published EpisodeStatus events can +be inspected. Drives the button/keyboard handlers directly and asserts on the +state machine these events feed into `extract_episodes`. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterator + +import pytest +import pytest_mock + +from dimos.core.module import LCMRPC +from dimos.learning.collection.episode_monitor import ( + EpisodeMonitorModule, + EpisodeStatus, + KeyPress, +) +from dimos.teleop.quest.quest_types import BUTTON_ALIASES, Buttons + + +@pytest.fixture +def make_monitor( + mocker: pytest_mock.MockerFixture, +) -> Iterator[Callable[..., EpisodeMonitorModule]]: + """Factory for an EpisodeMonitorModule with its boot patched out. + + `Module.__init__` starts an asyncio loop + RPC transport; patch both so the + test exercises only the state machine. The `status` port is a mock whose + `publish` calls record the emitted EpisodeStatus. Every built module is + stopped on teardown. + """ + mocker.patch("dimos.core.module.get_loop", return_value=(mocker.MagicMock(), None)) + mocker.patch.object(LCMRPC, "__init__", return_value=None) + mocker.patch.object(LCMRPC, "serve_module_rpc", return_value=None) + mocker.patch.object(LCMRPC, "start", return_value=None) + mocker.patch.object(LCMRPC, "stop", return_value=None) + + built: list[EpisodeMonitorModule] = [] + + def _make(**config: object) -> EpisodeMonitorModule: + m = EpisodeMonitorModule(**config) + m.status = mocker.MagicMock() # type: ignore[assignment] + built.append(m) + return m + + yield _make + for m in built: + m.stop() + + +def _events(monitor: EpisodeMonitorModule) -> list[EpisodeStatus]: + """The EpisodeStatus objects published on the monitor's `status` port.""" + return [call.args[0] for call in monitor.status.publish.call_args_list] # type: ignore[attr-defined] + + +def _press(monitor: EpisodeMonitorModule, alias: str, ts: float) -> None: + """Rising edge: release-then-press the given Quest button alias.""" + attr = BUTTON_ALIASES[alias] + released = Buttons() + pressed = Buttons() + setattr(pressed, attr, True) + monitor._on_buttons(released) + monitor._on_buttons(pressed) + + +def test_toggle_starts_then_saves(make_monitor: Callable[..., EpisodeMonitorModule]) -> None: + m = make_monitor() # default map: toggle=B, discard=Y + _press(m, "B", ts=1.0) # idle → recording + _press(m, "B", ts=2.0) # recording → idle (saved) + + events = _events(m) + assert [e.last_event for e in events] == ["start", "save"] + assert events[-1].state == "idle" + assert events[-1].episodes_saved == 1 + assert events[-1].episodes_discarded == 0 + + +def test_discard_does_not_count_as_saved( + make_monitor: Callable[..., EpisodeMonitorModule], +) -> None: + m = make_monitor() + _press(m, "B", ts=1.0) # start + _press(m, "Y", ts=2.0) # discard + + last = _events(m)[-1] + assert last.state == "idle" + assert last.episodes_saved == 0 + assert last.episodes_discarded == 1 + + +def test_start_while_recording_autocommits_previous( + make_monitor: Callable[..., EpisodeMonitorModule], +) -> None: + # toggle (start), then an explicit start via keyboard while still recording: + # the in-progress episode auto-commits (matches the offline extractor). + m = make_monitor(keyboard_map={"start": "r"}) + _press(m, "B", ts=1.0) # recording + m._on_keyboard(KeyPress(key="r", ts=2.0)) # start again → auto-commit prior + + last = _events(m)[-1] + assert last.last_event == "start" + assert last.state == "recording" + assert last.episodes_saved == 1 # the auto-committed one + + +def test_no_event_without_rising_edge( + make_monitor: Callable[..., EpisodeMonitorModule], +) -> None: + m = make_monitor() + pressed = Buttons() + pressed.right_secondary = True # B held + m._on_buttons(pressed) + m._on_buttons(pressed) # still held — no new edge + assert [e.last_event for e in _events(m)] == ["start"] + + +def test_published_status_is_internally_consistent( + make_monitor: Callable[..., EpisodeMonitorModule], +) -> None: + # Every published event's counters/state must match the event it carries — + # the snapshot is taken under the same lock as the mutation. + m = make_monitor() + _press(m, "B", 1.0) # start + _press(m, "B", 2.0) # save (1) + _press(m, "B", 3.0) # start + _press(m, "B", 4.0) # save (2) + _press(m, "B", 5.0) # start + _press(m, "Y", 6.0) # discard (1) + + events = _events(m) + for e in events: + if e.last_event == "start": + assert e.state == "recording" + elif e.last_event in ("save", "discard"): + assert e.state == "idle" + assert events[-1].episodes_saved == 2 + assert events[-1].episodes_discarded == 1 + + +def test_reset_counters(make_monitor: Callable[..., EpisodeMonitorModule]) -> None: + m = make_monitor() + _press(m, "B", 1.0) + _press(m, "B", 2.0) + status = m.reset_counters() + assert status.episodes_saved == 0 + assert status.episodes_discarded == 0 + assert status.state == "idle" + assert status.last_event == "init" diff --git a/dimos/learning/dataprep/build.py b/dimos/learning/dataprep/build.py new file mode 100644 index 0000000000..5fc9f7620f --- /dev/null +++ b/dimos/learning/dataprep/build.py @@ -0,0 +1,202 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""DataPrep build orchestration — the impure layer over `core.py`. + +`run_dataprep` (build) and `inspect_dataset` (read-back) own the I/O and side +effects — open/close the store, drive the writer/reader, emit logs, write +files; they compose the pure helpers in `core.py` and the per-format +readers/writers. Exposed by the `dimos dataprep` subcommand. +""" + +from __future__ import annotations + +from collections.abc import Iterator +import json +from pathlib import Path +from typing import Any + +from dimos.learning.dataprep.core import ( + DataPrepConfig, + Episode, + Sample, + extract_episodes, + get_writer, + iter_episode_samples, +) +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +def _write_dimos_meta(dataset_path: Path, config: DataPrepConfig, episodes: list[Episode]) -> None: + """Sidecar describing how this dataset was built, recording the obs/action + schema alongside the dataset.""" + meta = { + "source": config.source, + "observation": {k: v.model_dump() for k, v in config.observation.items()}, + "action": {k: v.model_dump() for k, v in config.action.items()}, + "sync": config.sync.model_dump(), + "episodes": [ + { + "id": e.id, + "start_ts": e.start_ts, + "end_ts": e.end_ts, + "task_label": e.task_label, + "success": e.success, + } + for e in episodes + ], + "format": config.output.format, + "metadata": config.output.metadata, + } + # Writers return a directory (lerobot) or a file (hdf5). Put the sidecar + # *inside* a directory, or *beside* a file (`.dimos_meta.json`). + if dataset_path.is_dir(): + meta_path = dataset_path / "dimos_meta.json" + else: + meta_path = dataset_path.with_name(f"{dataset_path.stem}.dimos_meta.json") + with open(meta_path, "w") as f: + json.dump(meta, f, indent=2, default=str) + + +def run_dataprep(config: DataPrepConfig) -> Path: + """Build a dataset from a recording and return the dataset path. + + Opens the source store, extracts episodes, streams samples through the + configured format writer, and writes `dimos_meta.json`. Synchronous — + raises on failure so the caller owns the exit code. + """ + from dimos.memory2.store.sqlite import SqliteStore + + shared = set(config.observation) & set(config.action) + if shared: + raise ValueError( + f"observation and action share feature name(s) {sorted(shared)}; " + f"give each a distinct key (they may still map to the same stream)." + ) + + logger.info( + "[dataprep] starting build source=%s extractor=%s output=%s", + config.source, + config.episodes.extractor, + config.output.path, + ) + store = SqliteStore(path=config.source, must_exist=True) + try: + logger.info("[dataprep] streams in source: %s", store.list_streams()) + all_eps = extract_episodes(store, config.episodes) + # Reindex survivors so sidecar ids match the writers' episode_index. + episodes = [ + e.model_copy(update={"id": f"ep_{i:06d}"}) + for i, e in enumerate(e for e in all_eps if e.success) + ] + logger.info( + "[dataprep] episodes extracted: %d total / %d successful", + len(all_eps), + len(episodes), + ) + + if not episodes: + raise RuntimeError( + f"No successful episodes extracted from {config.source!r} " + f"using extractor={config.episodes.extractor!r}. " + f"Available streams: {store.list_streams()}. " + f"For a recording with no episode_status stream, set " + f"extractor='ranges' with explicit (start, end) tuples." + ) + + obs_keys = set(config.observation) + action_keys = set(config.action) + streams = {**config.observation, **config.action} + logger.info( + "[dataprep] obs streams=%s action streams=%s sync=%s", + sorted(obs_keys), + sorted(action_keys), + config.sync.model_dump(), + ) + writer = get_writer(config.output.format) + # fps drives written timestamps + video rate, so tie it to the resample + # rate; an explicit metadata.fps still wins. + output = config.output + if config.sync.rate_hz > 0 and "fps" not in output.metadata: + output = output.model_copy( + update={"metadata": {**output.metadata, "fps": config.sync.rate_hz}} + ) + logger.info("[dataprep] writing %s dataset to %s", config.output.format, output.path) + + samples_seen = 0 + episodes_done = 0 + total = len(episodes) + + def _all_samples() -> Iterator[Sample]: + nonlocal samples_seen, episodes_done + for ep in episodes: + for sample in iter_episode_samples( + store=store, + episode=ep, + streams=streams, + sync=config.sync, + obs_keys=obs_keys, + action_keys=action_keys, + ): + samples_seen += 1 + if samples_seen % 50 == 0: + logger.info( + "[dataprep] %.1f%% samples=%d ep %d/%d", + 100.0 * episodes_done / total, + samples_seen, + episodes_done, + total, + ) + yield sample + episodes_done += 1 + + dataset_path = Path(writer(_all_samples(), output)) + _write_dimos_meta(dataset_path, config, episodes) + logger.info( + "[dataprep] succeeded — wrote %d samples across %d episodes to %s", + samples_seen, + total, + dataset_path, + ) + return dataset_path + finally: + store.stop() + + +def inspect_dataset(path: Path | str, fmt: str | None = None) -> dict[str, Any]: + """Summarize a built dataset: observation/action features (shape + dtype), + episode/frame counts, and whether shapes/lengths are uniform. + + `fmt` is auto-detected when omitted: a `.hdf5`/`.h5` file → hdf5; a + directory containing `meta/info.json` → lerobot. + """ + from dimos.learning.dataprep.core import get_inspector + + p = Path(path) + if fmt is None: + if p.suffix in (".h5", ".hdf5"): + fmt = "hdf5" + elif (p / "meta" / "info.json").exists(): + fmt = "lerobot" + else: + raise ValueError( + f"Cannot detect dataset format at {p}: expected a .hdf5 file or a " + f"lerobot directory with meta/info.json. Pass --format explicitly." + ) + return get_inspector(fmt)(p) + + +__all__ = ["inspect_dataset", "run_dataprep"] diff --git a/dimos/learning/dataprep/cli.py b/dimos/learning/dataprep/cli.py new file mode 100644 index 0000000000..654e31fc56 --- /dev/null +++ b/dimos/learning/dataprep/cli.py @@ -0,0 +1,112 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implementation of the `dimos dataprep` subcommand (build + inspect). + +DataPrep is a one-shot batch transform, not a long-lived module, so it runs +as a plain command over the pure helpers in `dimos.learning.dataprep.core` +and exits with a 0/1 status — no coordinator, no blocking loop. + +The obs/action stream maps are nested, so they come from a JSON +`DataPrepConfig` via `--config`; simple flags override `source`/`output`/ +`format` on top. See `dimos/learning/dataprep/example_config.json`. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import TYPE_CHECKING, Literal, cast + +import typer + +if TYPE_CHECKING: + from dimos.learning.dataprep.core import DataPrepConfig + + +def _load_config( + config_path: Path | None, + source: Path | None, + output: Path | None, + output_format: str | None, +) -> DataPrepConfig: + """Build a DataPrepConfig from an optional JSON file + flag overrides.""" + from dimos.learning.dataprep.core import DataPrepConfig, OutputConfig + + if config_path is not None: + cfg = DataPrepConfig.model_validate_json(Path(config_path).read_text()) + else: + cfg = DataPrepConfig() + + updates: dict[str, object] = {} + if source is not None: + updates["source"] = str(source) + if output is not None or output_format is not None: + fmt = ( + cast("Literal['lerobot', 'hdf5']", output_format) + if output_format + else cfg.output.format + ) + updates["output"] = OutputConfig( + format=fmt, + path=output or cfg.output.path, + metadata=cfg.output.metadata, + ) + return cfg.model_copy(update=updates) if updates else cfg + + +def build( + config_path: Path | None, + source: Path | None, + output: Path | None, + output_format: str | None, +) -> None: + from dimos.learning.dataprep.build import run_dataprep + + cfg = _load_config(config_path, source, output, output_format) + if not cfg.source: + typer.echo("error: no source given (use --source or set it in --config)", err=True) + raise typer.Exit(2) + if not cfg.observation and not cfg.action: + typer.echo( + "error: no observation/action streams configured; pass --config with the " + "stream maps (see dimos/learning/dataprep/example_config.json)", + err=True, + ) + raise typer.Exit(2) + + try: + path = run_dataprep(cfg) + except Exception as e: + # CLI boundary: any failure becomes a clean message + non-zero exit + # instead of a traceback. run_dataprep raises specific errors internally. + typer.echo(f"dataprep build failed: {e}", err=True) + raise typer.Exit(1) + typer.echo(f"✓ wrote dataset to {path}") + + +def inspect(dataset: Path | None, output_format: str | None) -> None: + from dimos.learning.dataprep.build import inspect_dataset + + if dataset is None: + typer.echo("error: no dataset given (pass a .hdf5 file or a lerobot directory)", err=True) + raise typer.Exit(2) + + try: + info = inspect_dataset(dataset, output_format) + except Exception as e: + # CLI boundary: surface failures as a message + non-zero exit, not a traceback. + typer.echo(f"dataprep inspect failed: {e}", err=True) + raise typer.Exit(1) + typer.echo(json.dumps(info, indent=2, default=str)) diff --git a/dimos/learning/dataprep/core.py b/dimos/learning/dataprep/core.py new file mode 100644 index 0000000000..add219b4ba --- /dev/null +++ b/dimos/learning/dataprep/core.py @@ -0,0 +1,382 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dataset-shape types + pure helpers. + +Sub-configs (StreamField, SyncConfig, OutputConfig, EpisodeExtractor) and +data records (Episode, Sample) live here. So do the stateless functions +that walk samples — `resolve_field`, `extract_episodes`, +`iter_episode_samples`. Pure and side-effect-free; importable without +booting a Module. + +The impure orchestration that composes these (opening the store, driving +the writer, writing files) lives in `build.py`. +""" + +from __future__ import annotations + +import bisect +from collections.abc import Callable, Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any, Literal + +import numpy as np +from pydantic import BaseModel, ConfigDict, Field + +from dimos.protocol.service.spec import BaseConfig + +if TYPE_CHECKING: + from dimos.memory2.store.sqlite import SqliteStore + from dimos.memory2.stream import Stream + +# A dataset format is a `formats/.py` module exposing `write` and +# `inspect` with these signatures, registered in `get_writer`/`get_inspector`. +Writer = Callable[[Iterator["Sample"], "OutputConfig"], Path] +Inspector = Callable[[Path], dict[str, Any]] + + +# ───────────────────────────────────────────────────────────────────────────── +# Sub-configs +# ───────────────────────────────────────────────────────────────────────────── + + +class EpisodeExtractor(BaseConfig): + extractor: Literal["episode_status", "ranges"] = "episode_status" + # Recorded stream name for EpisodeStatus events. Must match the recorder's + # `status` In port (CollectionRecorder records it as "status"). + status_stream: str = "status" + ranges: list[tuple[float, float]] | None = None + + +class StreamField(BaseConfig): + stream: str + field: str | None = None + + +class SyncConfig(BaseConfig): + anchor: str + rate_hz: float + tolerance_ms: float + # TODO: add "interp" — do it per-stream (lerp low-dim vectors, force nearest + # for ndim>=3 images, can't blend frames). Only "nearest" is wired today. + strategy: Literal["nearest"] = "nearest" + action_shift: int = 1 + + +class OutputConfig(BaseConfig): + format: Literal["lerobot", "hdf5"] = "lerobot" + path: Path + metadata: dict[str, Any] = Field(default_factory=dict) + + +class DataPrepConfig(BaseConfig): + """Everything needed to turn a recording into a dataset. + + `source` is a recording `.db`; `observation`/`action` map dataset feature + names to recorded streams; `sync` resamples them onto a common timeline; + `output` selects format + path. Consumed by `build.run_dataprep`. + """ + + source: str = "" + episodes: EpisodeExtractor = EpisodeExtractor() + observation: dict[str, StreamField] = Field(default_factory=dict) + action: dict[str, StreamField] = Field(default_factory=dict) + sync: SyncConfig = SyncConfig(anchor="image", rate_hz=30.0, tolerance_ms=50.0) + output: OutputConfig = OutputConfig(format="lerobot", path=Path("data/datasets/default")) + + +# ───────────────────────────────────────────────────────────────────────────── +# Data records +# ───────────────────────────────────────────────────────────────────────────── + + +class Episode(BaseModel): + id: str + start_ts: float + end_ts: float + task_label: str | None = None + success: bool = True + metadata: dict[str, Any] = Field(default_factory=dict) + + +class Sample(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + ts: float + episode_id: str + observation: dict[str, np.ndarray] + action: dict[str, np.ndarray] + task_label: str | None = None # carried from the episode for multi-task datasets + + +# ───────────────────────────────────────────────────────────────────────────── +# Pure helpers — used by format writers and run_dataprep +# ───────────────────────────────────────────────────────────────────────────── + + +def resolve_field(msg: Any, ref: StreamField) -> np.ndarray: + """Project `msg` through `ref` (attribute access) and coerce to ndarray. + + Single source of truth for obs/action construction across train and + live inference. Behavior: + - `ref.field is None`: best-effort coerce the whole message + (Image → `.data`, ndarray pass-through, list/tuple → asarray). + - `ref.field` set: `getattr(msg, ref.field)` (or `msg[ref.field]` + for dict payloads) then coerce. + """ + if ref.field is None: + value: Any = msg + elif isinstance(msg, dict): + value = msg[ref.field] + else: + value = getattr(msg, ref.field) + + if isinstance(value, np.ndarray): + return value + if hasattr(value, "data") and isinstance(value.data, np.ndarray): + # e.g. Image → use its underlying ndarray + return value.data + return np.asarray(value) + + +def extract_episodes(store: SqliteStore, cfg: EpisodeExtractor) -> list[Episode]: + """Walk recorded events into Episodes per the configured strategy. + + EPISODE_STATUS: scan `cfg.status_stream` for state transitions emitted + by `EpisodeMonitorModule`. State machine (mirrors the live monitor): + ev.last_event == "start": begin (auto-commit any prior pending) + ev.last_event == "save": commit (success=True) + ev.last_event == "discard": drop (success=False) + end of stream with pending: dropped (matches live spec) + + RANGES: emit one Episode per (start, end) tuple in `cfg.ranges`. + """ + if cfg.extractor == "ranges": + if not cfg.ranges: + return [] + return [ + Episode(id=f"ep_{i:06d}", start_ts=t0, end_ts=t1) + for i, (t0, t1) in enumerate(cfg.ranges) + ] + + # episode_status (default) + status_stream: Stream[Any, Any] = store.stream(cfg.status_stream) + events = list(status_stream) # observations in storage order + + episodes: list[Episode] = [] + pending_start_ts: float | None = None + pending_label: str | None = None + counter = 0 + + def _commit(end_ts: float, success: bool, label: str | None) -> None: + nonlocal counter, pending_start_ts, pending_label + if pending_start_ts is None: + return + episodes.append( + Episode( + id=f"ep_{counter:06d}", + start_ts=pending_start_ts, + end_ts=end_ts, + task_label=label, + success=success, + ) + ) + counter += 1 + pending_start_ts = None + pending_label = None + + for obs in events: + ev = obs.data + last_event = getattr(ev, "last_event", None) + ts = obs.ts + label = getattr(ev, "task_label", None) + + if last_event == "start": + # Auto-commit any prior pending episode (success=True per state-machine spec). + _commit(ts, success=True, label=pending_label) + # obs.ts is the press time — the recorder stamps EpisodeStatus from + # its own `.ts` field (set at the button press, not at record time). + pending_start_ts = ts + pending_label = label + elif last_event == "save": + _commit(ts, success=True, label=pending_label or label) + elif last_event == "discard": + _commit(ts, success=False, label=pending_label or label) + # "init" and unknown events are no-ops. + + # Anything still pending at end-of-stream is dropped (state-machine spec). + return episodes + + +def iter_episode_samples( + store: SqliteStore, + episode: Episode, + streams: dict[str, StreamField], # observation ∪ action + sync: SyncConfig, + obs_keys: set[str] | None = None, + action_keys: set[str] | None = None, +) -> Iterator[Sample]: + """Yield synced (obs, action) Samples for one episode. + + Walks the anchor stream at `sync.rate_hz` between `episode.start_ts` and + `episode.end_ts`. For each anchor timestamp, picks the nearest sample + from each configured stream within `sync.tolerance_ms`. Skips frames + where any required stream lacks a nearby sample. + + `obs_keys` / `action_keys` partition `streams` into observation vs + action. If omitted, every key is treated as observation (used by + callers that only need raw aligned data). + + With `sync.action_shift > 0` (default 1), each frame's action is taken + `action_shift` frames later (next-state target); the tail is dropped. + """ + if sync.anchor not in streams: + raise ValueError(f"sync.anchor {sync.anchor!r} not in streams: {sorted(streams)}") + + obs_keys = obs_keys if obs_keys is not None else set(streams) + action_keys = action_keys if action_keys is not None else set() + + tolerance_s = sync.tolerance_ms / 1000.0 + + # Materialize each stream's (timestamps, messages) once per episode. + cached: dict[str, tuple[list[float], list[Any]]] = {} + for key, ref in streams.items(): + sub: Stream[Any, Any] = store.stream(ref.stream).time_range( + episode.start_ts, episode.end_ts + ) + ts_list: list[float] = [] + msg_list: list[Any] = [] + for obs in sub: + ts_list.append(obs.ts) + msg_list.append(obs.data) + # Keep them sorted by time — query order is usually already sorted, but be safe. + if ts_list and any(ts_list[i] > ts_list[i + 1] for i in range(len(ts_list) - 1)): + order = sorted(range(len(ts_list)), key=ts_list.__getitem__) + ts_list = [ts_list[i] for i in order] + msg_list = [msg_list[i] for i in order] + cached[key] = (ts_list, msg_list) + + anchor_ts, _ = cached[sync.anchor] + if not anchor_ts: + return + + # Build the sequence of target timestamps for this episode. + if sync.rate_hz > 0: + # Uniform 1/rate_hz grid, phase-locked to the first anchor sample — + # what LeRobot expects (it assumes contiguous fixed-fps frames). + period = 1.0 / sync.rate_hz + targets: list[float] = [] + t = anchor_ts[0] + end = anchor_ts[-1] + while t <= end: + targets.append(t) + t += period + else: + # rate_hz=0: follow the anchor's own timestamps (no image resampling). + # dt is irregular if the camera jitters — fine for hdf5/custom trainers, + # but not LeRobot-uniform. + targets = list(anchor_ts) + + def _nearest(key: str, t: float) -> Any | None: + ts_list, msg_list = cached[key] + if not ts_list: + return None + # Nearest is i (first sample ≥ t) or i-1 (last sample < t). + i = bisect.bisect_left(ts_list, t) + if i == 0: + best = 0 + elif i == len(ts_list): + best = i - 1 + else: + best = i if (ts_list[i] - t) < (t - ts_list[i - 1]) else i - 1 + return msg_list[best] if abs(ts_list[best] - t) <= tolerance_s else None + + def _build_frames() -> Iterator[Sample]: + for t in targets: + obs_dict: dict[str, np.ndarray] = {} + act_dict: dict[str, np.ndarray] = {} + skip = False + for key, ref in streams.items(): + msg = _nearest(key, t) + if msg is None: + skip = True + break + arr = resolve_field(msg, ref) + if arr.ndim < 3: + arr = arr.astype(np.float32, copy=False) + if key in action_keys: + act_dict[key] = arr + elif key in obs_keys: + obs_dict[key] = arr + if skip: + continue + yield Sample( + ts=t, + episode_id=episode.id, + observation=obs_dict, + action=act_dict, + task_label=episode.task_label, + ) + + shift = max(0, sync.action_shift) + if shift == 0 or not action_keys: + yield from _build_frames() + return + + # frame i keeps its obs but takes frame i+shift's action; tail dropped. + frames = list(_build_frames()) + for i in range(len(frames) - shift): + cur = frames[i] + nxt = frames[i + shift] + yield Sample( + ts=cur.ts, + episode_id=cur.episode_id, + observation=cur.observation, + action=nxt.action, + task_label=cur.task_label, + ) + + +def get_writer(format_name: str) -> Writer: + """Lazy-import the format writer's `write` function.""" + if format_name == "lerobot": + from dimos.learning.dataprep.formats.lerobot import write + elif format_name == "hdf5": + from dimos.learning.dataprep.formats.hdf5 import write + else: + raise ValueError(f"Unknown format: {format_name!r}") + return write + + +def get_inspector(format_name: str) -> Inspector: + """Lazy-import the format reader's `inspect` function.""" + if format_name == "lerobot": + from dimos.learning.dataprep.formats.lerobot import inspect + elif format_name == "hdf5": + from dimos.learning.dataprep.formats.hdf5 import inspect + else: + raise ValueError(f"Unknown format: {format_name!r}") + return inspect + + +def summarize_lengths(lengths: list[int]) -> dict[str, Any]: + """Min/max/mean of per-episode frame counts + whether they're all equal.""" + if not lengths: + return {"min": 0, "max": 0, "mean": 0.0, "uniform": True} + return { + "min": min(lengths), + "max": max(lengths), + "mean": sum(lengths) / len(lengths), + "uniform": min(lengths) == max(lengths), + } diff --git a/dimos/learning/dataprep/example_config.json b/dimos/learning/dataprep/example_config.json new file mode 100644 index 0000000000..6e173afad8 --- /dev/null +++ b/dimos/learning/dataprep/example_config.json @@ -0,0 +1,38 @@ +{ + "source": "data/recordings/session.db", + "episodes": { + "extractor": "episode_status", + "status_stream": "status" + }, + "observation": { + "image": { + "stream": "color_image", + "field": "data" + }, + "joint_state": { + "stream": "joint_state", + "field": "position" + } + }, + "action": { + "joint_target": { + "stream": "joint_state", + "field": "position" + } + }, + "sync": { + "anchor": "image", + "rate_hz": 14.0, + "tolerance_ms": 80.0, + "action_shift": 1 + }, + "output": { + "format": "lerobot", + "path": "data/datasets/session", + "metadata": { + "fps": 14, + "robot": "xarm7", + "default_task_label": "pick_and_place" + } + } +} diff --git a/dimos/learning/dataprep/formats/_stats.py b/dimos/learning/dataprep/formats/_stats.py new file mode 100644 index 0000000000..0793b2f1cb --- /dev/null +++ b/dimos/learning/dataprep/formats/_stats.py @@ -0,0 +1,123 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Streaming feature stats — shared by every format writer. + +Welford for mean/std and a reservoir sample for q01/q99 over scalar / +low-dim features. Image-like (≥3D) features are subsampled and reduced +to per-channel summaries so per-pixel stats don't blow up memory. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import random +from typing import Any + +import numpy as np + + +@dataclass +class FeatureAggregator: + is_image: bool + n: int = 0 + mean: np.ndarray | None = None + m2: np.ndarray | None = None + minv: np.ndarray | None = None + maxv: np.ndarray | None = None + reservoir: list[np.ndarray] = field(default_factory=list) + image_seen: int = 0 + shape: tuple[int, ...] | None = None + dtype: str | None = None + + +class StreamingStats: + """Single-pass mean/std/min/max/quantile aggregator across many features.""" + + def __init__( + self, image_subsample: int = 10, quantile_reservoir: int = 10_000, seed: int = 0 + ) -> None: + self.image_subsample = image_subsample + self.quantile_reservoir = quantile_reservoir + self._rng = random.Random(seed) + self.aggs: dict[str, FeatureAggregator] = {} + + def update(self, name: str, value: np.ndarray) -> None: + a = np.asarray(value) + is_image = a.ndim >= 3 + agg = self.aggs.setdefault( + name, + FeatureAggregator(is_image=is_image, shape=tuple(a.shape), dtype=str(a.dtype)), + ) + + if is_image: + agg.image_seen += 1 + if (agg.image_seen - 1) % self.image_subsample != 0: + return + v = ( + a.astype(np.float32).mean(axis=(0, 1)) + if a.ndim == 3 + else a.astype(np.float32).reshape(-1) + ) + else: + v = a.astype(np.float64) + + if agg.mean is None: + agg.mean = np.zeros(v.shape, dtype=np.float64) + agg.m2 = np.zeros(v.shape, dtype=np.float64) + agg.minv = np.full(v.shape, np.inf, dtype=np.float64) + agg.maxv = np.full(v.shape, -np.inf, dtype=np.float64) + + agg.n += 1 + delta = v - agg.mean + agg.mean += delta / agg.n + assert agg.m2 is not None + agg.m2 += delta * (v - agg.mean) + assert agg.minv is not None and agg.maxv is not None + np.minimum(agg.minv, v, out=agg.minv) + np.maximum(agg.maxv, v, out=agg.maxv) + + if not is_image: + if len(agg.reservoir) < self.quantile_reservoir: + agg.reservoir.append(v.copy()) + else: + j = self._rng.randint(0, agg.n - 1) + if j < self.quantile_reservoir: + agg.reservoir[j] = v.copy() + + def finalize(self) -> dict[str, dict[str, Any]]: + out: dict[str, dict[str, Any]] = {} + for name, agg in self.aggs.items(): + if agg.mean is None: + continue + n = max(1, agg.n) + assert agg.m2 is not None + var = agg.m2 / n if agg.n > 1 else np.zeros_like(agg.mean) + std = np.sqrt(var) + entry: dict[str, Any] = { + "mean": agg.mean.tolist(), + "std": std.tolist(), + "min": agg.minv.tolist() if agg.minv is not None else None, + "max": agg.maxv.tolist() if agg.maxv is not None else None, + "count": int(agg.n), + } + if agg.reservoir: + stacked = np.stack(agg.reservoir, axis=0) + entry["q01"] = np.quantile(stacked, 0.01, axis=0).tolist() + entry["q99"] = np.quantile(stacked, 0.99, axis=0).tolist() + out[name] = entry + return out + + +__all__ = ["FeatureAggregator", "StreamingStats"] diff --git a/dimos/learning/dataprep/formats/hdf5.py b/dimos/learning/dataprep/formats/hdf5.py new file mode 100644 index 0000000000..ba10924669 --- /dev/null +++ b/dimos/learning/dataprep/formats/hdf5.py @@ -0,0 +1,200 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""HDF5 dataset writer. + +Single ``.hdf5`` file with one group per episode plus a stats group. +Layout:: + + .hdf5 (or output.path. if a dir was given) + / attrs: codebase_version, robot, fps, + num_episodes, num_frames, num_tasks + /tasks attrs: task_ = "" + /stats/ attrs: count + datasets mean/std/min/max[/q01/q99] + /episodes/episode_NNNNNN + timestamp (T,) float32 + (T, ...) as recorded + (T, ...) as recorded + attrs: length, start_ts, task_index + +This is the ACT-original style adapted to one file with multiple episodes. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from pathlib import Path +from typing import Any + +import numpy as np + +from dimos.learning.dataprep.core import OutputConfig, Sample +from dimos.learning.dataprep.formats._stats import StreamingStats + + +def write(samples: Iterator[Sample], output: OutputConfig) -> Path: + """Drain `samples` into a single .hdf5 file. Returns the file path.""" + try: + import h5py + except ImportError as e: + raise RuntimeError("HDF5 writer requires h5py — install with `pip install h5py`") from e + + out = Path(output.path) + if out.suffix not in (".h5", ".hdf5"): + out = out.with_suffix(".hdf5") + out.parent.mkdir(parents=True, exist_ok=True) + + stats = StreamingStats( + image_subsample=int(output.metadata.get("image_subsample", 10)), + quantile_reservoir=int(output.metadata.get("quantile_reservoir", 10_000)), + seed=int(output.metadata.get("stats_seed", 0)), + ) + + default_task_label: str = output.metadata.get("default_task_label", "task") + fps = float(output.metadata.get("fps", 30.0)) + + tasks_index: dict[str, int] = {} + + # Per-episode buffers — flushed at episode boundary. + cur_id: str | None = None + cur_idx = 0 + cur_task = default_task_label # actual label for the in-progress episode + cur_start_ts: float | None = None + buf_ts: list[float] = [] + buf_obs: dict[str, list[np.ndarray]] = {} + buf_act: dict[str, list[np.ndarray]] = {} + + total_frames = 0 + + with h5py.File(out, "w") as h5: + episodes_g = h5.create_group("episodes") + + def _flush() -> bool: + if not buf_ts: + return False + ep = episodes_g.create_group(f"episode_{cur_idx:06d}") + ep.attrs["length"] = len(buf_ts) + ep.attrs["start_ts"] = float(cur_start_ts or 0.0) + ep.attrs["task_index"] = tasks_index[cur_task] + ep.create_dataset("timestamp", data=np.asarray(buf_ts, dtype=np.float32)) + for k, frames in buf_obs.items(): + arr = np.stack(frames, axis=0) + ep.create_dataset( + f"observation/{k}", + data=arr, + compression="gzip" if arr.ndim >= 3 else None, + compression_opts=4 if arr.ndim >= 3 else None, + ) + for k, frames in buf_act.items(): + ep.create_dataset(f"action/{k}", data=np.stack(frames, axis=0)) + buf_ts.clear() + buf_obs.clear() + buf_act.clear() + return True + + for sample in samples: + if sample.episode_id != cur_id: + if _flush(): + cur_idx += 1 + cur_id = sample.episode_id + cur_start_ts = float(sample.ts) + cur_task = sample.task_label or default_task_label + if cur_task not in tasks_index: + tasks_index[cur_task] = len(tasks_index) + + buf_ts.append(float(sample.ts) - (cur_start_ts or 0.0)) + for k, v in sample.observation.items(): + a = np.asarray(v) + buf_obs.setdefault(k, []).append(a) + stats.update(f"observation.{k}", a) + for k, v in sample.action.items(): + a = np.asarray(v) + buf_act.setdefault(k, []).append(a) + stats.update(f"action.{k}", a) + total_frames += 1 + + _flush() + + # ── meta ──────────────────────────────────────────────────────────── + h5.attrs["codebase_version"] = "dimos-v1" + h5.attrs["robot"] = output.metadata.get("robot", "unknown") + h5.attrs["fps"] = fps + h5.attrs["num_episodes"] = len(episodes_g) + h5.attrs["num_frames"] = total_frames + h5.attrs["num_tasks"] = len(tasks_index) + + tasks_g = h5.create_group("tasks") + for task, idx in tasks_index.items(): + tasks_g.attrs[f"task_{idx}"] = task + + stats_g = h5.create_group("stats") + for name, entry in stats.finalize().items(): + g = stats_g.create_group(name) + g.attrs["count"] = entry["count"] + for k in ("mean", "std", "min", "max", "q01", "q99"): + if k in entry and entry[k] is not None: + g.create_dataset(k, data=np.asarray(entry[k], dtype=np.float64)) + + return out + + +def inspect(path: Path) -> dict[str, Any]: + """Summarize an .hdf5 dataset: features (per-frame shape/dtype), episode + counts, and whether feature shapes are uniform across episodes.""" + try: + import h5py + except ImportError as e: + raise RuntimeError("HDF5 inspect requires h5py — install with `pip install h5py`") from e + + from dimos.learning.dataprep.core import summarize_lengths + + out = Path(path) + with h5py.File(out, "r") as h5: + eps_g = h5["episodes"] + ep_names = sorted(eps_g.keys()) + lengths = [int(eps_g[e].attrs.get("length", 0)) for e in ep_names] + + observation: dict[str, Any] = {} + action: dict[str, Any] = {} + # Feature schema from the first episode (per-frame shape = dataset.shape[1:]). + if ep_names: + first = eps_g[ep_names[0]] + for grp, ref in (("observation", observation), ("action", action)): + if grp in first: + for k, d in first[grp].items(): + ref[k] = {"shape": list(d.shape[1:]), "dtype": str(d.dtype)} + + # Are per-frame shapes consistent across every episode? + shapes_uniform = True + for ep_name in ep_names[1:]: + g = eps_g[ep_name] + for grp, ref in (("observation", observation), ("action", action)): + if grp in g: + for k, d in g[grp].items(): + if k in ref and list(d.shape[1:]) != ref[k]["shape"]: + shapes_uniform = False + + return { + "format": "hdf5", + "path": str(out), + "episodes": int(h5.attrs.get("num_episodes", len(ep_names))), + "frames": int(h5.attrs.get("num_frames", sum(lengths))), + "fps": float(h5.attrs.get("fps", 0.0)), + "robot": str(h5.attrs.get("robot", "unknown")), + "observation": observation, + "action": action, + "episode_lengths": summarize_lengths(lengths), + "shapes_uniform": shapes_uniform, + "has_stats": "stats" in h5, + } diff --git a/dimos/learning/dataprep/formats/lerobot.py b/dimos/learning/dataprep/formats/lerobot.py new file mode 100644 index 0000000000..db9d4eeb50 --- /dev/null +++ b/dimos/learning/dataprep/formats/lerobot.py @@ -0,0 +1,484 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""LeRobot v3.0 dataset writer. + +v3.0 differs structurally from v2.x: instead of one parquet + one MP4 *per +episode*, episodes are **concatenated** into shared chunked files, and all +per-episode bookkeeping (frame/byte ranges, video time offsets, per-episode +stats) moves into an episodes *parquet*. + +Layout:: + + / + meta/info.json schema, fps, totals, features + meta/tasks.parquet task strings (indexed by `task`) + meta/stats.json aggregated per-feature stats + meta/episodes/chunk-000/file-000.parquet one row per episode (+ stats) + data/chunk-000/file-000.parquet ALL episodes' frames concatenated + videos//chunk-000/file-000.mp4 ALL episodes for a camera, concatenated + +This writer emits a **single** data file and a single MP4 per camera (chunk +000 / file 000); LeRobot supports multi-file rolling at size limits, which we +don't need yet (logged if a soft limit is exceeded). A frame's `timestamp` is +relative to its episode; the episode's `videos//from_timestamp` gives its +offset inside the shared MP4, so `from_timestamp + timestamp` locates the frame. +""" + +from __future__ import annotations + +from collections.abc import Iterator +import json +from pathlib import Path +from typing import Any + +import numpy as np + +from dimos.learning.dataprep.core import OutputConfig, Sample +from dimos.learning.dataprep.formats._stats import StreamingStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +CHUNK = "chunk-000" +FILE = "file-000" +DATA_DIR = "data" +VIDEO_DIR = "videos" +META_DIR = "meta" +EPISODES_DIR = "episodes" + +# LeRobot defaults; we write a single file but warn past these soft limits. +DATA_FILE_SIZE_MB = 100 +VIDEO_FILE_SIZE_MB = 200 +CHUNKS_SIZE = 1000 + + +def _feature_name( + prefix: str, key: str, is_image: bool, single_action: bool, single_state: bool = False +) -> str: + """Translate (prefix, key) into the LeRobot feature name. + + Canonical names lerobot policies (ACT, Diffusion, π₀) expect: + observation.state single proprio vector + action single action vector + observation.images. per-camera RGB + Multi-key fallbacks: ``observation.`` / ``action.``. + """ + if prefix == "action" and single_action: + return "action" + if is_image: + return f"observation.images.{key}" + if prefix == "observation" and single_state: + return "observation.state" + if prefix == "observation": + return f"observation.{key}" + return f"action.{key}" + + +def _nest_image_stat(vals: list[float]) -> list[list[list[float]]]: + """Per-channel [c0,c1,c2] → shape (C,1,1) [[[c0]],[[c1]],[[c2]]] (lerobot image stats).""" + return [[[float(c)]] for c in vals] + + +def _flatten_episode_stats( + final: dict[str, dict[str, Any]], feature_dtypes: dict[str, str] +) -> dict[str, Any]: + """Flatten a per-episode StreamingStats result into ``stats//`` columns. + + Image features get the (C,1,1) nesting lerobot expects; low-dim stay flat. + """ + out: dict[str, Any] = {} + for feat, entry in final.items(): + is_video = feature_dtypes.get(feat) == "video" + for k in ("mean", "std", "min", "max"): + v = entry.get(k) + if v is None: + continue + out[f"stats/{feat}/{k}"] = _nest_image_stat(v) if is_video else v + out[f"stats/{feat}/count"] = int(entry["count"]) + for q in ("q01", "q99"): + if q in entry: + out[f"stats/{feat}/{q}"] = _nest_image_stat(entry[q]) if is_video else entry[q] + return out + + +def write(samples: Iterator[Sample], output: OutputConfig) -> Path: + """Drain `samples`, write a LeRobot v3.0 dataset. Returns the dataset root path.""" + try: + import cv2 + except ImportError as e: + raise RuntimeError("LeRobot writer requires opencv-python (cv2) for MP4 encoding") from e + try: + import pyarrow as pa + import pyarrow.parquet as pq + except ImportError as e: + raise RuntimeError("LeRobot writer requires pyarrow for parquet writes") from e + try: + import pandas as pd + except ImportError as e: + raise RuntimeError("LeRobot writer requires pandas for tasks.parquet") from e + + root = Path(output.path) + (root / META_DIR / EPISODES_DIR / CHUNK).mkdir(parents=True, exist_ok=True) + (root / DATA_DIR / CHUNK).mkdir(parents=True, exist_ok=True) + + fps = float(output.metadata.get("fps", 30.0)) + fourcc = cv2.VideoWriter.fourcc(*"mp4v") + default_task_label = output.metadata.get("default_task_label", "task") + + def _stats() -> StreamingStats: + return StreamingStats( + image_subsample=int(output.metadata.get("image_subsample", 10)), + quantile_reservoir=int(output.metadata.get("quantile_reservoir", 10_000)), + seed=int(output.metadata.get("stats_seed", 0)), + ) + + global_stats = _stats() # aggregated across all frames → meta/stats.json + + # Schema discovery (filled as samples flow). + image_keys: list[str] = [] + state_keys: list[str] = [] + action_keys: list[str] = [] + feature_shapes: dict[str, tuple[int, ...]] = {} + feature_dtypes: dict[str, str] = {} + + tasks_index: dict[str, int] = {} + episode_rows: list[dict[str, Any]] = [] + + # Single concatenated data file (opened on first flush). + data_path = root / DATA_DIR / CHUNK / f"{FILE}.parquet" + data_writer: Any = None + + # One MP4 per camera, persisting across episodes; from/to timestamps per episode. + video_writers: dict[str, Any] = {} + video_cum_frames: dict[str, int] = {} # frames written per camera so far + + global_index = 0 + episode_index = -1 + + # Per-episode buffers. + cur_id: str | None = None + cur_rows: list[dict[str, Any]] = [] + cur_ep_stats = _stats() + cur_task = default_task_label # actual label for the in-progress episode + + def _video_path(image_key: str) -> Path: + feat = _feature_name("observation", image_key, is_image=True, single_action=False) + d = root / VIDEO_DIR / feat / CHUNK + d.mkdir(parents=True, exist_ok=True) + return d / f"{FILE}.mp4" + + def _open_video(image_key: str, frame: np.ndarray) -> Any: + h, w = frame.shape[:2] + path = _video_path(image_key) + vw = cv2.VideoWriter(str(path), fourcc, fps, (w, h)) + if not vw.isOpened(): + raise RuntimeError(f"Failed to open VideoWriter for {path}") + return vw + + def _flush_episode() -> None: + nonlocal data_writer + if not cur_rows: + return + length = len(cur_rows) + single_state = len(state_keys) == 1 + single_action = len(action_keys) == 1 + + cols: dict[str, Any] = { + "timestamp": pa.array([r["timestamp"] for r in cur_rows], pa.float32()), + "frame_index": pa.array([r["frame_index"] for r in cur_rows], pa.int64()), + "episode_index": pa.array([r["episode_index"] for r in cur_rows], pa.int64()), + "index": pa.array([r["index"] for r in cur_rows], pa.int64()), + "task_index": pa.array([r["task_index"] for r in cur_rows], pa.int64()), + } + f32_list = pa.list_(pa.float32()) + for k in state_keys: + name = _feature_name("observation", k, False, False, single_state=single_state) + cols[name] = pa.array([r["obs"][k].tolist() for r in cur_rows], type=f32_list) + for k in action_keys: + name = _feature_name("action", k, False, single_action=single_action) + cols[name] = pa.array([r["act"][k].tolist() for r in cur_rows], type=f32_list) + table = pa.Table.from_pydict(cols) + if data_writer is None: + data_writer = pq.ParquetWriter(data_path, table.schema, compression="snappy") + data_writer.write_table(table) + + # Episode metadata row. + row: dict[str, Any] = { + "episode_index": episode_index, + "tasks": [list(tasks_index.keys())[cur_rows[0]["task_index"]]], + "length": length, + "data/chunk_index": 0, + "data/file_index": 0, + "dataset_from_index": global_index - length, + "dataset_to_index": global_index, + "meta/episodes/chunk_index": 0, + "meta/episodes/file_index": 0, + } + for k in image_keys: + feat = _feature_name("observation", k, is_image=True, single_action=False) + cum = video_cum_frames.get(k, 0) + row[f"videos/{feat}/chunk_index"] = 0 + row[f"videos/{feat}/file_index"] = 0 + row[f"videos/{feat}/from_timestamp"] = (cum - length) / fps + row[f"videos/{feat}/to_timestamp"] = cum / fps + row.update(_flatten_episode_stats(cur_ep_stats.finalize(), feature_dtypes)) + episode_rows.append(row) + cur_rows.clear() + + # try/finally so the parquet footer is written and MP4s are released even if + # the drain raises mid-stream — otherwise the data file is unreadable (no + # footer) and the videos lose their index. + try: + for sample in samples: + if sample.episode_id != cur_id: + _flush_episode() + cur_id = sample.episode_id + episode_index += 1 + cur_ep_stats = _stats() + # Per-episode task label (falls back to the config default). + cur_task = sample.task_label or default_task_label + if cur_task not in tasks_index: + tasks_index[cur_task] = len(tasks_index) + + # Schema discovery + stats (global + per-episode). + n_low_dim_obs = sum(1 for v in sample.observation.values() if np.asarray(v).ndim < 3) + single_state = n_low_dim_obs == 1 + for k, arr in sample.observation.items(): + a = np.asarray(arr) + is_image = a.ndim >= 3 + name = _feature_name("observation", k, is_image, False, single_state=single_state) + if name not in feature_shapes: + feature_shapes[name] = tuple(a.shape) + feature_dtypes[name] = "video" if is_image else str(a.dtype) + if is_image: + if k not in image_keys: + image_keys.append(k) + elif k not in state_keys: + state_keys.append(k) + global_stats.update(name, a) + cur_ep_stats.update(name, a) + single_action = len(sample.action) == 1 + for k, arr in sample.action.items(): + a = np.asarray(arr) + name = _feature_name("action", k, is_image=False, single_action=single_action) + if name not in feature_shapes: + feature_shapes[name] = tuple(a.shape) + feature_dtypes[name] = str(a.dtype) + if k not in action_keys: + action_keys.append(k) + global_stats.update(name, a) + cur_ep_stats.update(name, a) + + # Append image frames to the per-camera MP4 (RGB→BGR; cv2 is BGR-native). + for k, arr in sample.observation.items(): + a = np.asarray(arr) + if a.ndim >= 3: + if k not in video_writers: + video_writers[k] = _open_video(k, a) + bgr = cv2.cvtColor(a, cv2.COLOR_RGB2BGR) if a.shape[-1] == 3 else a + video_writers[k].write(bgr) + video_cum_frames[k] = video_cum_frames.get(k, 0) + 1 + + frame_index = len(cur_rows) + cur_rows.append( + { + "timestamp": frame_index / fps, # relative to this episode + "frame_index": frame_index, + "episode_index": episode_index, + "index": global_index, + "task_index": tasks_index[cur_task], + "obs": { + k: np.asarray(v) + for k, v in sample.observation.items() + if np.asarray(v).ndim < 3 + }, + "act": {k: np.asarray(v) for k, v in sample.action.items()}, + } + ) + global_index += 1 + + _flush_episode() + finally: + if data_writer is not None: + data_writer.close() + for vw in video_writers.values(): + vw.release() + + total_episodes = len(episode_rows) + total_frames = global_index + if data_path.exists() and data_path.stat().st_size > DATA_FILE_SIZE_MB * 1e6: + logger.warning( + "[dataprep] data file exceeds %d MB (single-file writer, no rolling): %s", + DATA_FILE_SIZE_MB, + data_path, + ) + + _write_meta( + root, + fps=fps, + total_episodes=total_episodes, + total_frames=total_frames, + feature_shapes=feature_shapes, + feature_dtypes=feature_dtypes, + image_keys=image_keys, + tasks_index=tasks_index, + episode_rows=episode_rows, + global_stats=global_stats, + robot=output.metadata.get("robot", "unknown"), + pa=pa, + pq=pq, + pd=pd, + ) + return root + + +def _write_meta( + root: Path, + *, + fps: float, + total_episodes: int, + total_frames: int, + feature_shapes: dict[str, tuple[int, ...]], + feature_dtypes: dict[str, str], + image_keys: list[str], + tasks_index: dict[str, int], + episode_rows: list[dict[str, Any]], + global_stats: StreamingStats, + robot: str, + pa: Any, + pq: Any, + pd: Any, +) -> None: + """Write info.json, tasks.parquet, episodes parquet, and aggregated stats.json.""" + features: dict[str, Any] = {} + for name, shape in feature_shapes.items(): + if feature_dtypes[name] == "video": + features[name] = { + "dtype": "video", + "shape": list(shape), + "names": ["height", "width", "channel"], + "info": { + "video.fps": fps, + "video.height": int(shape[0]), + "video.width": int(shape[1]), + "video.channels": int(shape[2]) if len(shape) > 2 else 3, + "video.codec": "mp4v", + "video.pix_fmt": "yuv420p", + "video.is_depth_map": False, + "has_audio": False, + }, + } + else: + n = int(shape[0]) if shape else 0 + base = name.split(".")[-1] + features[name] = { + "dtype": feature_dtypes[name], + "shape": list(shape), + "names": [f"{base}_{i}" for i in range(n)], + } + for col, dt in [ + ("timestamp", "float32"), + ("frame_index", "int64"), + ("episode_index", "int64"), + ("index", "int64"), + ("task_index", "int64"), + ]: + features[col] = {"dtype": dt, "shape": [1], "names": None} + + info = { + "codebase_version": "v3.0", + "robot_type": robot, + "total_episodes": total_episodes, + "total_frames": total_frames, + "total_tasks": len(tasks_index), + "chunks_size": CHUNKS_SIZE, + "data_files_size_in_mb": DATA_FILE_SIZE_MB, + "video_files_size_in_mb": VIDEO_FILE_SIZE_MB, + "fps": fps, + "splits": {"train": f"0:{total_episodes}"}, + "data_path": "data/chunk-{chunk_index:03d}/file-{file_index:03d}.parquet", + "video_path": "videos/{video_key}/chunk-{chunk_index:03d}/file-{file_index:03d}.mp4", + "features": features, + } + with open(root / META_DIR / "info.json", "w") as f: + json.dump(info, f, indent=2) + + # tasks.parquet — task strings as the (named) index + a task_index column. + tasks_df = pd.DataFrame( + {"task_index": list(tasks_index.values())}, + index=pd.Index(list(tasks_index.keys()), name="task"), + ) + tasks_df.to_parquet(root / META_DIR / "tasks.parquet") + + # episodes parquet — one row per episode (+ flattened per-episode stats). + ep_table = pa.Table.from_pylist(episode_rows) + pq.write_table( + ep_table, root / META_DIR / EPISODES_DIR / CHUNK / f"{FILE}.parquet", compression="snappy" + ) + + # Aggregated stats.json (image features nested to (C,1,1)). + final_stats = global_stats.finalize() + for name, entry in final_stats.items(): + if feature_dtypes.get(name) == "video": + for k in ("mean", "std", "min", "max"): + if entry.get(k) is not None: + entry[k] = _nest_image_stat(entry[k]) + with open(root / META_DIR / "stats.json", "w") as f: + json.dump(final_stats, f, indent=2) + + +_META_COLS = {"timestamp", "frame_index", "episode_index", "index", "task_index"} + + +def inspect(path: Path) -> dict[str, Any]: + """Summarize a LeRobot v3.0 dataset from meta/ (info.json + episodes parquet).""" + import pyarrow.parquet as pq + + from dimos.learning.dataprep.core import summarize_lengths + + root = Path(path) + info = json.loads((root / META_DIR / "info.json").read_text()) + features = info.get("features", {}) + + observation: dict[str, Any] = {} + action: dict[str, Any] = {} + for name, feat in features.items(): + if name in _META_COLS: + continue + entry = {"shape": feat.get("shape"), "dtype": feat.get("dtype")} + if name.startswith("observation"): + observation[name] = entry + elif name.startswith("action"): + action[name] = entry + + lengths: list[int] = [] + ep_file = root / META_DIR / EPISODES_DIR / CHUNK / f"{FILE}.parquet" + if ep_file.exists(): + lengths = pq.read_table(ep_file, columns=["length"]).column("length").to_pylist() + + return { + "format": "lerobot", + "version": info.get("codebase_version"), + "path": str(root), + "episodes": info.get("total_episodes"), + "frames": info.get("total_frames"), + "fps": info.get("fps"), + "robot": info.get("robot_type"), + "observation": observation, + "action": action, + "episode_lengths": summarize_lengths(lengths), + "shapes_uniform": True, # LeRobot declares one global feature schema + "has_stats": (root / META_DIR / "stats.json").exists(), + } diff --git a/dimos/learning/dataprep/formats/test_hdf5.py b/dimos/learning/dataprep/formats/test_hdf5.py new file mode 100644 index 0000000000..336b7674be --- /dev/null +++ b/dimos/learning/dataprep/formats/test_hdf5.py @@ -0,0 +1,86 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Round-trip tests for the HDF5 dataset writer/reader. + +Builds a tiny in-memory Sample stream (no SqliteStore), writes it, then reads +it back through `inspect` and asserts the episode/frame counts, per-feature +shapes, and that stats landed. Skips cleanly if h5py isn't installed (it lives +in the `learning` optional-dependency group). +""" + +from __future__ import annotations + +from collections.abc import Iterator +from pathlib import Path + +import numpy as np +import pytest + +h5py = pytest.importorskip("h5py") + +from dimos.learning.dataprep.core import OutputConfig, Sample +from dimos.learning.dataprep.formats.hdf5 import inspect, write + + +def _samples(n_episodes: int = 2, n_frames: int = 3) -> Iterator[Sample]: + """obs `state` (4-vec) + `action` (2-vec), `n_frames` per episode.""" + for ep in range(n_episodes): + for i in range(n_frames): + yield Sample( + ts=float(i), + episode_id=f"ep_{ep:06d}", + observation={"state": (np.arange(4, dtype=np.float32) + i)}, + action={"action": np.full(2, float(i), dtype=np.float32)}, + ) + + +def test_hdf5_roundtrip_counts_and_shapes(tmp_path: Path) -> None: + out = OutputConfig( + format="hdf5", + path=tmp_path / "session", + metadata={"fps": 20.0, "robot": "xarm7"}, + ) + path = write(_samples(), out) + assert path.suffix == ".hdf5" + assert path.exists() + + info = inspect(path) + assert info["format"] == "hdf5" + assert info["episodes"] == 2 + assert info["frames"] == 6 + assert info["fps"] == 20.0 + assert info["robot"] == "xarm7" + assert info["observation"]["state"]["shape"] == [4] + assert info["action"]["action"]["shape"] == [2] + assert info["shapes_uniform"] is True + assert info["has_stats"] is True + assert info["episode_lengths"] == {"min": 3, "max": 3, "mean": 3.0, "uniform": True} + + +def test_hdf5_extension_appended_when_missing(tmp_path: Path) -> None: + # path with no suffix → writer appends .hdf5 + out = OutputConfig(format="hdf5", path=tmp_path / "noext") + path = write(_samples(n_episodes=1, n_frames=2), out) + assert path.name == "noext.hdf5" + + +def test_hdf5_stats_values_match(tmp_path: Path) -> None: + out = OutputConfig(format="hdf5", path=tmp_path / "s.hdf5") + path = write(_samples(n_episodes=1, n_frames=3), out) + with h5py.File(path, "r") as f: + assert "observation.state" in f["stats"] + # state = [0..3]+i for i in 0,1,2 → per-dim mean = base + mean(0,1,2)=base+1 + mean = f["stats"]["observation.state"]["mean"][:] + np.testing.assert_allclose(mean, np.arange(4) + 1.0) diff --git a/dimos/learning/dataprep/formats/test_lerobot.py b/dimos/learning/dataprep/formats/test_lerobot.py new file mode 100644 index 0000000000..c01f6bebcb --- /dev/null +++ b/dimos/learning/dataprep/formats/test_lerobot.py @@ -0,0 +1,225 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Smoke tests for the LeRobot v3.0 writer/reader. + +Asserts the v3.0 layout: a single concatenated data parquet, parquet meta +(tasks + episodes, no jsonl), and one MP4 per camera under +`videos//chunk-000/`. The image test skips if no mp4v codec is available; +the whole module skips if pyarrow/pandas aren't installed (`learning` extra). +""" + +from __future__ import annotations + +from collections.abc import Iterator +import json +from pathlib import Path + +import numpy as np +import pytest + +pytest.importorskip("pyarrow") +pytest.importorskip("pandas") +cv2 = pytest.importorskip("cv2") + +# Below the importorskip guards above; used to read back v3.0 meta/parquet. +import pandas as pd +import pyarrow.parquet as pq + +from dimos.learning.dataprep.core import OutputConfig, Sample +from dimos.learning.dataprep.formats.lerobot import inspect, write + + +def _state_samples(n: int = 4) -> Iterator[Sample]: + for i in range(n): + yield Sample( + ts=float(i), + episode_id="ep_000000", + observation={"state": np.arange(6, dtype=np.float32)}, + action={"action": np.full(6, float(i), dtype=np.float32)}, + ) + + +def _two_episode_samples() -> Iterator[Sample]: + for ep in range(2): + for i in range(3): + yield Sample( + ts=float(ep * 3 + i), + episode_id=f"ep_{ep:06d}", + observation={"state": np.arange(6, dtype=np.float32) + ep}, + action={"action": np.full(6, float(i), dtype=np.float32)}, + ) + + +def _image_samples(n: int = 4) -> Iterator[Sample]: + for i in range(n): + yield Sample( + ts=float(i), + episode_id="ep_000000", + observation={ + "state": np.arange(6, dtype=np.float32), + "cam": np.full((16, 16, 3), i, dtype=np.uint8), + }, + action={"action": np.zeros(6, dtype=np.float32)}, + ) + + +def test_lerobot_v3_state_only_layout_and_naming(tmp_path: Path) -> None: + out = OutputConfig( + format="lerobot", path=tmp_path / "ds", metadata={"fps": 10.0, "robot": "xarm7"} + ) + root = write(_state_samples(), out) + + # v3.0: concatenated single data file + parquet meta (no jsonl, no per-episode parquet) + assert (root / "meta" / "info.json").exists() + assert (root / "meta" / "tasks.parquet").exists() + assert (root / "meta" / "stats.json").exists() + assert (root / "meta" / "episodes" / "chunk-000" / "file-000.parquet").exists() + assert (root / "data" / "chunk-000" / "file-000.parquet").exists() + assert not (root / "meta" / "episodes.jsonl").exists() + assert not (root / "meta" / "tasks.jsonl").exists() + + info = json.loads((root / "meta" / "info.json").read_text()) + assert info["codebase_version"] == "v3.0" + assert info["total_episodes"] == 1 + assert info["total_frames"] == 4 + assert info["fps"] == 10.0 + assert info["data_path"] == "data/chunk-{chunk_index:03d}/file-{file_index:03d}.parquet" + # single low-dim state + single action → canonical names + assert "observation.state" in info["features"] + assert "action" in info["features"] + + +def test_lerobot_v3_episode_metadata_columns(tmp_path: Path) -> None: + out = OutputConfig(format="lerobot", path=tmp_path / "ds", metadata={"fps": 10.0}) + # two episodes so dataset_from/to_index advance + root = write(_two_episode_samples(), out) + ep = pq.read_table(root / "meta" / "episodes" / "chunk-000" / "file-000.parquet") + cols = set(ep.column_names) + for required in ( + "episode_index", + "tasks", + "length", + "dataset_from_index", + "dataset_to_index", + "data/chunk_index", + "data/file_index", + "meta/episodes/chunk_index", + "meta/episodes/file_index", + ): + assert required in cols, f"missing episode column {required}" + # per-episode stats are embedded (flattened) + assert any(c.startswith("stats/observation.state/") for c in cols) + rows = ep.to_pylist() + assert [r["episode_index"] for r in rows] == [0, 1] + assert rows[0]["dataset_from_index"] == 0 and rows[0]["dataset_to_index"] == 3 + assert rows[1]["dataset_from_index"] == 3 and rows[1]["dataset_to_index"] == 6 + + +def test_lerobot_v3_writer_closed_on_midstream_error(tmp_path: Path) -> None: + """If the drain raises after an episode was flushed, the data parquet must + still be readable (footer written by the finally), not a headerless stub.""" + + def bad_samples() -> Iterator[Sample]: + for i in range(3): # episode 0 + yield Sample( + ts=float(i), + episode_id="ep_000000", + observation={"state": np.arange(6, dtype=np.float32)}, + action={"action": np.full(6, float(i), dtype=np.float32)}, + ) + # first frame of episode 1 flushes episode 0 (opens + writes the parquet)… + yield Sample( + ts=3.0, + episode_id="ep_000001", + observation={"state": np.arange(6, dtype=np.float32)}, + action={"action": np.zeros(6, dtype=np.float32)}, + ) + raise RuntimeError("boom mid-stream") # …then blow up before the final flush + + out = OutputConfig(format="lerobot", path=tmp_path / "ds", metadata={"fps": 10.0}) + with pytest.raises(RuntimeError, match="boom"): + write(bad_samples(), out) + + # episode 0's 3 frames were flushed; the file must have a valid footer. + data = tmp_path / "ds" / "data" / "chunk-000" / "file-000.parquet" + assert data.exists() + assert pq.read_table(data).num_rows == 3 # raises ArrowInvalid if footer missing + + +def test_lerobot_v3_per_episode_task_labels(tmp_path: Path) -> None: + """Episodes with distinct task_labels must produce distinct tasks + task_index + (multi-task recordings must not collapse to one task).""" + + def samples() -> Iterator[Sample]: + for ep, task in ((0, "pick"), (1, "place")): + for i in range(3): + yield Sample( + ts=float(ep * 3 + i), + episode_id=f"ep_{ep:06d}", + observation={"state": np.arange(6, dtype=np.float32)}, + action={"action": np.zeros(6, dtype=np.float32)}, + task_label=task, + ) + + out = OutputConfig(format="lerobot", path=tmp_path / "ds", metadata={"fps": 10.0}) + root = write(samples(), out) + + tasks = pd.read_parquet(root / "meta" / "tasks.parquet") + assert set(tasks.index) == {"pick", "place"} + + ep = pq.read_table(root / "meta" / "episodes" / "chunk-000" / "file-000.parquet").to_pylist() + assert ep[0]["tasks"] == ["pick"] + assert ep[1]["tasks"] == ["place"] + + data = pq.read_table(root / "data" / "chunk-000" / "file-000.parquet") + ti = data.column("task_index").to_pylist() + assert ti[:3] == [0, 0, 0] # episode 0 → task 0 (pick) + assert ti[3:] == [1, 1, 1] # episode 1 → task 1 (place) + + +def test_lerobot_v3_inspect_state_only(tmp_path: Path) -> None: + out = OutputConfig(format="lerobot", path=tmp_path / "ds", metadata={"fps": 10.0}) + root = write(_state_samples(), out) + info = inspect(root) + assert info["format"] == "lerobot" + assert info["version"] == "v3.0" + assert info["episodes"] == 1 + assert info["frames"] == 4 + assert "observation.state" in info["observation"] + assert "action" in info["action"] + assert info["has_stats"] is True + + +def test_lerobot_v3_with_images_writes_concatenated_mp4(tmp_path: Path) -> None: + out = OutputConfig(format="lerobot", path=tmp_path / "ds", metadata={"fps": 10.0}) + try: + root = write(_image_samples(), out) + except RuntimeError as e: + if "VideoWriter" in str(e): + pytest.skip(f"no mp4v encoder available in this environment: {e}") + raise + + # v3.0 video path: videos//chunk-000/file-000.mp4 (key before chunk, one per camera) + mp4 = root / "videos" / "observation.images.cam" / "chunk-000" / "file-000.mp4" + assert mp4.exists() and mp4.stat().st_size > 0 + + info = json.loads((root / "meta" / "info.json").read_text()) + assert ( + info["video_path"] == "videos/{video_key}/chunk-{chunk_index:03d}/file-{file_index:03d}.mp4" + ) + assert info["features"]["observation.images.cam"]["dtype"] == "video" + # image column is excluded from parquet; state/action remain + assert "observation.state" in info["features"] + assert info["total_frames"] == 4 diff --git a/dimos/learning/dataprep/formats/test_stats.py b/dimos/learning/dataprep/formats/test_stats.py new file mode 100644 index 0000000000..d04cd6dc17 --- /dev/null +++ b/dimos/learning/dataprep/formats/test_stats.py @@ -0,0 +1,85 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the streaming feature-stats aggregator (`_stats.py`). + +Pure numpy — no I/O, no optional deps. Verifies the Welford mean/std/min/max, +the low-dim quantile reservoir, and the image per-channel reduction + +subsampling. +""" + +from __future__ import annotations + +import numpy as np + +from dimos.learning.dataprep.formats._stats import StreamingStats + + +def test_scalar_mean_std_minmax_count() -> None: + s = StreamingStats() + for v in ([1.0, 10.0], [2.0, 20.0], [3.0, 30.0]): + s.update("state", np.array(v)) + out = s.finalize()["state"] + # population variance (m2 / n): [1,2,3] → 2/3 ; [10,20,30] → 200/3 + np.testing.assert_allclose(out["mean"], [2.0, 20.0]) + np.testing.assert_allclose(out["std"], [np.sqrt(2 / 3), np.sqrt(200 / 3)]) + assert out["min"] == [1.0, 10.0] + assert out["max"] == [3.0, 30.0] + assert out["count"] == 3 + + +def test_single_sample_has_zero_std() -> None: + s = StreamingStats() + s.update("x", np.array([5.0, 7.0])) + out = s.finalize()["x"] + assert out["std"] == [0.0, 0.0] + assert out["count"] == 1 + + +def test_lowdim_quantiles_present_and_bounded() -> None: + s = StreamingStats() + for i in range(100): + s.update("x", np.array([float(i)])) + out = s.finalize()["x"] + assert "q01" in out and "q99" in out + assert out["min"][0] <= out["q01"][0] <= out["q99"][0] <= out["max"][0] + + +def test_image_reduced_to_per_channel_no_quantiles() -> None: + # image_subsample=1 → every frame counts. Constant per-channel values. + s = StreamingStats(image_subsample=1) + img = np.zeros((4, 4, 3), dtype=np.uint8) + img[..., 0], img[..., 1], img[..., 2] = 10, 20, 30 + for _ in range(5): + s.update("cam", img) + out = s.finalize()["cam"] + np.testing.assert_allclose(out["mean"], [10.0, 20.0, 30.0]) + np.testing.assert_allclose(out["min"], [10.0, 20.0, 30.0]) + np.testing.assert_allclose(out["max"], [10.0, 20.0, 30.0]) + assert out["count"] == 5 + # Images skip the quantile reservoir (per-pixel stats would blow up memory). + assert "q01" not in out and "q99" not in out + + +def test_image_subsampling_counts_every_nth_frame() -> None: + s = StreamingStats(image_subsample=10) + img = np.zeros((2, 2, 3), dtype=np.uint8) + for _ in range(25): + s.update("cam", img) + # frames 0, 10, 20 sampled → count 3 + assert s.finalize()["cam"]["count"] == 3 + + +def test_empty_aggregator_finalizes_empty() -> None: + assert StreamingStats().finalize() == {} diff --git a/dimos/learning/dataprep/test_core.py b/dimos/learning/dataprep/test_core.py new file mode 100644 index 0000000000..dbabe94f16 --- /dev/null +++ b/dimos/learning/dataprep/test_core.py @@ -0,0 +1,341 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the pure DataPrep helpers in `core.py`. + +No I/O: a tiny in-memory fake stands in for `SqliteStore`, exposing only the +surface the helpers touch (`stream(name)` → iterable of `.ts`/`.data` records, +with `.time_range(t0, t1)`). Keeps these fast and dependency-free. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import json +from pathlib import Path +from typing import Any + +import numpy as np +import pytest + +from dimos.learning.dataprep.build import _write_dimos_meta, run_dataprep +from dimos.learning.dataprep.core import ( + DataPrepConfig, + Episode, + EpisodeExtractor, + OutputConfig, + StreamField, + SyncConfig, + extract_episodes, + iter_episode_samples, + resolve_field, + summarize_lengths, +) + +# ── fakes ──────────────────────────────────────────────────────────────────── + + +@dataclass +class _Obs: + ts: float + data: Any + + +class _FakeStream: + def __init__(self, obs: list[_Obs]) -> None: + self._obs = sorted(obs, key=lambda o: o.ts) + + def __iter__(self): + return iter(self._obs) + + def time_range(self, t0: float, t1: float) -> _FakeStream: + return _FakeStream([o for o in self._obs if t0 <= o.ts <= t1]) + + +class _FakeStore: + def __init__(self, streams: dict[str, list[_Obs]]) -> None: + self._streams = {k: _FakeStream(v) for k, v in streams.items()} + + def stream(self, name: str) -> _FakeStream: + return self._streams.get(name, _FakeStream([])) + + def list_streams(self) -> list[str]: + return list(self._streams) + + +@dataclass +class _Status: + """Mimics EpisodeStatus fields the extractor reads via getattr.""" + + last_event: str + task_label: str | None = None + + +def _status(events: list[tuple[float, str, str | None]]) -> list[_Obs]: + """events = [(ts, last_event, label), ...]""" + return [_Obs(ts=ts, data=_Status(last_event=ev, task_label=lbl)) for ts, ev, lbl in events] + + +# ── resolve_field ──────────────────────────────────────────────────────────── + + +def test_resolve_field_attribute() -> None: + @dataclass + class Msg: + position: list[float] + + arr = resolve_field(Msg(position=[1.0, 2.0, 3.0]), StreamField(stream="x", field="position")) + assert isinstance(arr, np.ndarray) + np.testing.assert_array_equal(arr, np.array([1.0, 2.0, 3.0])) + + +def test_resolve_field_dict_payload() -> None: + arr = resolve_field({"q": [4, 5]}, StreamField(stream="x", field="q")) + np.testing.assert_array_equal(arr, np.array([4, 5])) + + +def test_resolve_field_none_passthrough_ndarray() -> None: + src = np.arange(6).reshape(2, 3) + out = resolve_field(src, StreamField(stream="x", field=None)) + assert out is src # ndarray passes straight through + + +def test_resolve_field_none_unwraps_data_attr() -> None: + @dataclass + class Image: + data: np.ndarray + + img = Image(data=np.ones((2, 2))) + out = resolve_field(img, StreamField(stream="x", field=None)) + np.testing.assert_array_equal(out, np.ones((2, 2))) + + +# ── extract_episodes: episode_status ───────────────────────────────────────── + + +def test_extract_start_save() -> None: + store = _FakeStore({"status": _status([(1.0, "start", "pick"), (5.0, "save", None)])}) + eps = extract_episodes(store, EpisodeExtractor(status_stream="status")) + assert len(eps) == 1 + assert eps[0].start_ts == 1.0 and eps[0].end_ts == 5.0 + assert eps[0].success is True + assert eps[0].task_label == "pick" + + +def test_extract_discard_marks_failure() -> None: + store = _FakeStore({"status": _status([(1.0, "start", None), (3.0, "discard", None)])}) + eps = extract_episodes(store, EpisodeExtractor(status_stream="status")) + assert len(eps) == 1 + assert eps[0].success is False + + +def test_extract_auto_commit_on_restart() -> None: + # start, then another start without save → first auto-commits (success=True) + store = _FakeStore( + { + "status": _status( + [ + (1.0, "start", None), + (4.0, "start", None), + (8.0, "save", None), + ] + ) + } + ) + eps = extract_episodes(store, EpisodeExtractor(status_stream="status")) + assert len(eps) == 2 + assert eps[0].start_ts == 1.0 and eps[0].end_ts == 4.0 and eps[0].success is True + assert eps[1].start_ts == 4.0 and eps[1].end_ts == 8.0 + + +def test_extract_pending_at_eof_dropped() -> None: + store = _FakeStore({"status": _status([(1.0, "start", None)])}) + eps = extract_episodes(store, EpisodeExtractor(status_stream="status")) + assert eps == [] + + +def test_extract_init_and_unknown_are_noops() -> None: + store = _FakeStore( + {"status": _status([(0.5, "init", None), (1.0, "start", None), (5.0, "save", None)])} + ) + eps = extract_episodes(store, EpisodeExtractor(status_stream="status")) + assert len(eps) == 1 + + +def test_extract_save_without_start_emits_nothing() -> None: + store = _FakeStore({"status": _status([(2.0, "save", None)])}) + assert extract_episodes(store, EpisodeExtractor(status_stream="status")) == [] + + +# ── extract_episodes: ranges ───────────────────────────────────────────────── + + +def test_extract_ranges() -> None: + cfg = EpisodeExtractor(extractor="ranges", ranges=[(0.0, 1.0), (2.0, 3.0)]) + eps = extract_episodes(_FakeStore({}), cfg) + assert [(e.start_ts, e.end_ts) for e in eps] == [(0.0, 1.0), (2.0, 3.0)] + + +def test_extract_ranges_empty() -> None: + cfg = EpisodeExtractor(extractor="ranges", ranges=None) + assert extract_episodes(_FakeStore({}), cfg) == [] + + +# ── iter_episode_samples ───────────────────────────────────────────────────── + + +def _scalar_stream(values: list[tuple[float, float]]) -> list[_Obs]: + """values = [(ts, scalar), ...] → messages with a `.position` 1-vector.""" + + @dataclass + class S: + position: list[float] + + return [_Obs(ts=ts, data=S(position=[v])) for ts, v in values] + + +def test_sync_basic_no_shift() -> None: + # obs == action, shift disabled → one sample per anchor target + store = _FakeStore( + { + "js": _scalar_stream([(0.0, 10.0), (1.0, 11.0), (2.0, 12.0)]), + } + ) + ep = Episode(id="ep_0", start_ts=0.0, end_ts=2.0) + streams = { + "state": StreamField(stream="js", field="position"), + "act": StreamField(stream="js", field="position"), + } + sync = SyncConfig(anchor="state", rate_hz=1.0, tolerance_ms=100.0, action_shift=0) + samples = list( + iter_episode_samples(store, ep, streams, sync, obs_keys={"state"}, action_keys={"act"}) + ) + assert len(samples) == 3 + # action equals state at the same frame + np.testing.assert_array_equal(samples[0].observation["state"], samples[0].action["act"]) + + +def test_sync_action_shift_next_state() -> None: + store = _FakeStore({"js": _scalar_stream([(0.0, 10.0), (1.0, 11.0), (2.0, 12.0)])}) + ep = Episode(id="ep_0", start_ts=0.0, end_ts=2.0) + streams = { + "state": StreamField(stream="js", field="position"), + "act": StreamField(stream="js", field="position"), + } + sync = SyncConfig(anchor="state", rate_hz=1.0, tolerance_ms=100.0, action_shift=1) + samples = list( + iter_episode_samples(store, ep, streams, sync, obs_keys={"state"}, action_keys={"act"}) + ) + # 3 frames, shift 1 → 2 emitted; trailing frame dropped + assert len(samples) == 2 + # frame 0: obs is state@0 (10), action is state@1 (11) + np.testing.assert_array_equal(samples[0].observation["state"], [10.0]) + np.testing.assert_array_equal(samples[0].action["act"], [11.0]) + np.testing.assert_array_equal(samples[1].observation["state"], [11.0]) + np.testing.assert_array_equal(samples[1].action["act"], [12.0]) + + +def test_sync_tolerance_skips_unmatched_frame() -> None: + # anchor ticks every 1s, but the second stream has a big gap around t=1 + store = _FakeStore( + { + "anchor": _scalar_stream([(0.0, 0.0), (1.0, 0.0), (2.0, 0.0)]), + "other": _scalar_stream([(0.0, 5.0), (2.0, 7.0)]), # nothing near t=1 + } + ) + ep = Episode(id="ep_0", start_ts=0.0, end_ts=2.0) + streams = { + "anchor": StreamField(stream="anchor", field="position"), + "other": StreamField(stream="other", field="position"), + } + sync = SyncConfig(anchor="anchor", rate_hz=1.0, tolerance_ms=100.0, action_shift=0) + samples = list(iter_episode_samples(store, ep, streams, sync, obs_keys={"anchor", "other"})) + # t=1 dropped (no `other` within 100ms) → only t=0 and t=2 survive + assert [round(s.ts) for s in samples] == [0, 2] + + +def test_sync_missing_anchor_raises() -> None: + ep = Episode(id="ep_0", start_ts=0.0, end_ts=1.0) + streams = {"x": StreamField(stream="x", field="position")} + sync = SyncConfig(anchor="not_there", rate_hz=1.0, tolerance_ms=10.0) + with pytest.raises(ValueError, match="anchor"): + list(iter_episode_samples(_FakeStore({}), ep, streams, sync)) + + +def test_sync_empty_anchor_yields_nothing() -> None: + store = _FakeStore({"a": []}) + ep = Episode(id="ep_0", start_ts=0.0, end_ts=1.0) + streams = {"a": StreamField(stream="a", field="position")} + sync = SyncConfig(anchor="a", rate_hz=1.0, tolerance_ms=10.0) + assert list(iter_episode_samples(store, ep, streams, sync)) == [] + + +# ── summarize_lengths ──────────────────────────────────────────────────────── + + +def test_summarize_lengths_uniform() -> None: + assert summarize_lengths([5, 5, 5]) == {"min": 5, "max": 5, "mean": 5.0, "uniform": True} + + +def test_summarize_lengths_varied() -> None: + s = summarize_lengths([2, 4, 6]) + assert s == {"min": 2, "max": 6, "mean": 4.0, "uniform": False} + + +def test_summarize_lengths_empty() -> None: + assert summarize_lengths([]) == {"min": 0, "max": 0, "mean": 0.0, "uniform": True} + + +# ── dimos_meta sidecar ─────────────────────────────────────────────────────── + + +def test_dimos_meta_records_sync_and_action_shift(tmp_path: Path) -> None: + cfg = DataPrepConfig( + source="s.db", + observation={"state": StreamField(stream="js", field="position")}, + action={"action": StreamField(stream="js", field="position")}, + sync=SyncConfig(anchor="state", rate_hz=14.0, tolerance_ms=80.0, action_shift=0), + output=OutputConfig(format="lerobot", path=tmp_path, metadata={"fps": 14}), + ) + _write_dimos_meta(tmp_path, cfg, episodes=[]) + + meta = json.loads((tmp_path / "dimos_meta.json").read_text()) + assert meta["sync"]["action_shift"] == 0 + assert meta["source"] == "s.db" + + +def test_dimos_meta_beside_file_for_hdf5(tmp_path: Path) -> None: + """hdf5 writer returns a FILE path; the sidecar must land beside it, not + inside it (which would treat the .hdf5 file as a directory and crash).""" + ds_file = tmp_path / "session.hdf5" + ds_file.write_bytes(b"\x89HDF\r\n") # stand-in for a real .hdf5 + cfg = DataPrepConfig(source="s.db", output=OutputConfig(format="hdf5", path=ds_file)) + + _write_dimos_meta(ds_file, cfg, episodes=[]) + + sidecar = tmp_path / "session.dimos_meta.json" + assert sidecar.exists() # beside the file, not session.hdf5/dimos_meta.json + assert json.loads(sidecar.read_text())["format"] == "hdf5" + + +def test_run_dataprep_rejects_shared_obs_action_key() -> None: + """A name in both obs and action would silently drop the obs feature when the + two maps merge; run_dataprep must reject it before opening the store.""" + cfg = DataPrepConfig( + source="nonexistent.db", # never reached — the check runs first + observation={"joints": StreamField(stream="joint_state", field="position")}, + action={"joints": StreamField(stream="joint_state", field="position")}, + ) + with pytest.raises(ValueError, match="share feature name"): + run_dataprep(cfg) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 291af8540e..3522b84b95 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -65,6 +65,8 @@ "keyboard-teleop-piper": "dimos.robot.manipulators.piper.blueprints:keyboard_teleop_piper", "keyboard-teleop-xarm6": "dimos.robot.manipulators.xarm.blueprints:keyboard_teleop_xarm6", "keyboard-teleop-xarm7": "dimos.robot.manipulators.xarm.blueprints:keyboard_teleop_xarm7", + "learning-collect-quest-piper": "dimos.learning.collection.blueprint:learning_collect_quest_piper", + "learning-collect-quest-xarm7": "dimos.learning.collection.blueprint:learning_collect_quest_xarm7", "mid360": "dimos.hardware.sensors.lidar.livox.livox_blueprints:mid360", "mid360-fastlio": "dimos.hardware.sensors.lidar.fastlio2.fastlio_blueprints:mid360_fastlio", "mid360-fastlio-ray-trace": "dimos.hardware.sensors.lidar.fastlio2.fastlio_blueprints:mid360_fastlio_ray_trace", @@ -141,6 +143,7 @@ "camera-module": "dimos.hardware.sensors.camera.module.CameraModule", "cartesian-motion-controller": "dimos.manipulation.control.servo_control.cartesian_motion_controller.CartesianMotionController", "click-start-goal-router": "dimos.navigation.nav_stack.modules.click_start_goal_router.click_start_goal_router.ClickStartGoalRouter", + "collection-recorder": "dimos.learning.collection.recorder.CollectionRecorder", "control-coordinator": "dimos.control.coordinator.ControlCoordinator", "cost-mapper": "dimos.mapping.costmapper.CostMapper", "demo-calculator-skill": "dimos.agents.skills.demo_calculator_skill.DemoCalculatorSkill", @@ -155,6 +158,7 @@ "drone-connection-module": "dimos.robot.drone.connection_module.DroneConnectionModule", "drone-tracking-module": "dimos.robot.drone.drone_tracking_module.DroneTrackingModule", "emitter-module": "dimos.utils.demo_image_encoding.EmitterModule", + "episode-monitor-module": "dimos.learning.collection.episode_monitor.EpisodeMonitorModule", "evaluator": "dimos.navigation.nav_3d.evaluator.evaluator.Evaluator", "far-planner": "dimos.navigation.nav_stack.modules.far_planner.far_planner.FarPlanner", "fast-lio2": "dimos.hardware.sensors.lidar.fastlio2.module.FastLio2", diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 3f94a6be4e..6c80ea88d5 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -681,6 +681,40 @@ def send( map_app = typer.Typer(help="Voxel-map tools over recorded sqlite datasets") main.add_typer(map_app, name="map") map_app.command("global")(_map_main) + + +dataprep_app = typer.Typer(help="Build and inspect learning datasets from recordings") +main.add_typer(dataprep_app, name="dataprep") + + +@dataprep_app.command("build") +def dataprep_build( + source: Path = typer.Option(None, "--source", "-s", help="Recording .db to read"), + output: Path = typer.Option(None, "--output", help="Dataset output directory"), + output_format: str = typer.Option(None, "--format", "-f", help="Output format: lerobot | hdf5"), + config_path: Path = typer.Option( + None, "--config", "-c", help="JSON DataPrepConfig (needed for obs/action stream maps)" + ), +) -> None: + """Build a dataset from a recording (lerobot/hdf5 + dimos_meta.json).""" + from dimos.learning.dataprep.cli import build + + build(config_path, source, output, output_format) + + +@dataprep_app.command("inspect") +def dataprep_inspect( + dataset: Path = typer.Argument(None, help="Built dataset: a .hdf5 file or a lerobot directory"), + output_format: str = typer.Option( + None, "--format", "-f", help="lerobot | hdf5 (auto-detected from the path if omitted)" + ), +) -> None: + """Summarize a built dataset: features, shapes, episode/frame counts, uniformity.""" + from dimos.learning.dataprep.cli import inspect + + inspect(dataset, output_format) + + map_app.command("summary")(_map_summary_main) map_app.command("rename")(_map_rename_main) map_app.command("pose-fill")(_map_pose_fill_main) diff --git a/dimos/robot/test_all_blueprints.py b/dimos/robot/test_all_blueprints.py index cdf72e9b6b..ec51e681f5 100644 --- a/dimos/robot/test_all_blueprints.py +++ b/dimos/robot/test_all_blueprints.py @@ -50,6 +50,8 @@ "coordinator-xarm6", "coordinator-xarm7", "dual-xarm6-planner", + "learning-collect-quest-piper", + "learning-collect-quest-xarm7", "teleop-hosted-go2", "teleop-hosted-xarm7", "teleop-quest-dual", diff --git a/dimos/teleop/quest/quest_types.py b/dimos/teleop/quest/quest_types.py index 7e7cfc7620..7757a926b5 100644 --- a/dimos/teleop/quest/quest_types.py +++ b/dimos/teleop/quest/quest_types.py @@ -195,4 +195,21 @@ def from_controllers( return buttons -__all__ = ["Buttons", "QuestControllerState", "ThumbstickState"] +# Quest controller face-button labels → Buttons attribute names. Callers can +# also pass a raw attribute name (e.g. "right_grip") directly where an alias is +# accepted. +BUTTON_ALIASES: dict[str, str] = { + "A": "right_primary", + "B": "right_secondary", + "X": "left_primary", + "Y": "left_secondary", + "LT": "left_trigger", + "RT": "right_trigger", + "LG": "left_grip", + "RG": "right_grip", + "MENU_L": "left_menu", + "MENU_R": "right_menu", +} + + +__all__ = ["BUTTON_ALIASES", "Buttons", "QuestControllerState", "ThumbstickState"] diff --git a/pyproject.toml b/pyproject.toml index a5415b272f..f6844dcc10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -200,6 +200,13 @@ visualization = [ "dimos-viewer==0.32.0a1", ] +learning = [ + # dimos.learning.dataprep dataset writers (lazy-imported per format) + "pyarrow", # LeRobot v3.0 data/episodes parquet + "pandas", # LeRobot v3.0 tasks.parquet (task-indexed) + "h5py", # HDF5 writer +] + agents = [ "langchain>=1.2.3,<2", "langchain-chroma>=1,<2", @@ -530,6 +537,8 @@ module = [ "etils", "faster_whisper", "geometry_msgs.*", + "h5py", + "h5py.*", "lazy_loader", "mcap", "mcap.*", @@ -547,6 +556,8 @@ module = [ "pycuda.*", "pydrake", "pydrake.*", + "pyarrow", + "pyarrow.*", "pyzed", "pyzed.*", "rclpy.*", diff --git a/uv.lock b/uv.lock index 7eee927cc4..7a599dfa96 100644 --- a/uv.lock +++ b/uv.lock @@ -2074,6 +2074,12 @@ dds = [ drone = [ { name = "pymavlink" }, ] +learning = [ + { name = "h5py" }, + { name = "pandas", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "pandas", version = "3.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "pyarrow" }, +] manipulation = [ { name = "a750-control", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" }, { name = "drake", version = "1.45.0", source = { registry = "https://pypi.org/simple" }, marker = "platform_machine != 'aarch64' and sys_platform == 'darwin'" }, @@ -2405,6 +2411,7 @@ requires-dist = [ { name = "gdown", marker = "extra == 'misc'", specifier = ">=5.2.2" }, { name = "googlemaps", marker = "extra == 'misc'", specifier = ">=4.10.0" }, { name = "gtsam-extended", marker = "extra == 'mapping'", specifier = ">=4.3a1.post1" }, + { name = "h5py", marker = "extra == 'learning'" }, { name = "hydra-core", marker = "extra == 'perception'", specifier = ">=1.3.0" }, { name = "ipykernel", marker = "extra == 'misc'" }, { name = "jinja2", marker = "extra == 'web'", specifier = ">=3.1.6" }, @@ -2439,6 +2446,7 @@ requires-dist = [ { name = "openai", marker = "extra == 'agents'" }, { name = "opencv-contrib-python", marker = "extra == 'apriltag'", specifier = "==4.10.0.84" }, { name = "opencv-python" }, + { name = "pandas", marker = "extra == 'learning'" }, { name = "pillow", marker = "extra == 'perception'" }, { name = "pin", specifier = ">=3.3.0" }, { name = "pin-pink", marker = "extra == 'manipulation'", specifier = ">=4.2.0" }, @@ -2451,6 +2459,7 @@ requires-dist = [ { name = "protobuf", specifier = ">=6.33.5,<7" }, { name = "psutil", specifier = ">=7.0.0" }, { name = "psycopg2-binary", marker = "extra == 'psql'", specifier = ">=2.9.11" }, + { name = "pyarrow", marker = "extra == 'learning'" }, { name = "pycollada", marker = "extra == 'manipulation'" }, { name = "pydantic" }, { name = "pydantic-settings", specifier = ">=2.11.0,<3" }, @@ -2499,7 +2508,7 @@ requires-dist = [ { name = "xformers", marker = "platform_machine == 'x86_64' and extra == 'cuda'", specifier = ">=0.0.20" }, { name = "yapf", marker = "extra == 'misc'", specifier = "==0.40.2" }, ] -provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "base", "apriltag", "all"] +provides-extras = ["misc", "visualization", "learning", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "base", "apriltag", "all"] [package.metadata.requires-dev] autofix = [{ name = "ruff", specifier = "==0.14.3" }] @@ -3661,6 +3670,65 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/69/b2/119f6e6dcbd96f9069ce9a2665e0146588dc9f88f29549711853645e736a/h2-4.3.0-py3-none-any.whl", hash = "sha256:c438f029a25f7945c69e0ccf0fb951dc3f73a5f6412981daee861431b70e2bdd", size = 61779, upload-time = "2025-08-23T18:12:17.779Z" }, ] +[[package]] +name = "h5py" +version = "3.16.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/db/33/acd0ce6863b6c0d7735007df01815403f5589a21ff8c2e1ee2587a38f548/h5py-3.16.0.tar.gz", hash = "sha256:a0dbaad796840ccaa67a4c144a0d0c8080073c34c76d5a6941d6818678ef2738", size = 446526, upload-time = "2026-03-06T13:49:08.07Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3a/6b/231413e58a787a89b316bb0d1777da3c62257e4797e09afd8d17ad3549dc/h5py-3.16.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e06f864bedb2c8e7c1358e6c73af48519e317457c444d6f3d332bb4e8fa6d7d9", size = 3724137, upload-time = "2026-03-06T13:47:35.242Z" }, + { url = "https://files.pythonhosted.org/packages/74/f9/557ce3aad0fe8471fb5279bab0fc56ea473858a022c4ce8a0b8f303d64e9/h5py-3.16.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ec86d4fffd87a0f4cb3d5796ceb5a50123a2a6d99b43e616e5504e66a953eca3", size = 3090112, upload-time = "2026-03-06T13:47:37.634Z" }, + { url = "https://files.pythonhosted.org/packages/7a/f5/e15b3d0dc8a18e56409a839e6468d6fb589bc5207c917399c2e0706eeb44/h5py-3.16.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:86385ea895508220b8a7e45efa428aeafaa586bd737c7af9ee04661d8d84a10d", size = 4844847, upload-time = "2026-03-06T13:47:39.811Z" }, + { url = "https://files.pythonhosted.org/packages/cb/92/a8851d936547efe30cc0ce5245feac01f3ec6171f7899bc3f775c72030b3/h5py-3.16.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:8975273c2c5921c25700193b408e28d6bdd0111c37468b2d4e25dcec4cd1d84d", size = 5065352, upload-time = "2026-03-06T13:47:41.489Z" }, + { url = "https://files.pythonhosted.org/packages/2b/ae/f2adc5d0ca9626db3277a3d87516e124cbc5d0eea0bd79bc085702d04f2c/h5py-3.16.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:1677ad48b703f44efc9ea0c3ab284527f81bc4f318386aaaebc5fede6bbae56f", size = 4839173, upload-time = "2026-03-06T13:47:43.586Z" }, + { url = "https://files.pythonhosted.org/packages/64/0b/e0c8c69da1d8838da023a50cd3080eae5d475691f7636b35eff20bb6ef20/h5py-3.16.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:7c4dd4cf5f0a4e36083f73172f6cfc25a5710789269547f132a20975bfe2434c", size = 5076216, upload-time = "2026-03-06T13:47:45.315Z" }, + { url = "https://files.pythonhosted.org/packages/66/35/d88fd6718832133c885004c61ceeeb24dbd6397ef877dbed6b3a64d6a286/h5py-3.16.0-cp310-cp310-win_amd64.whl", hash = "sha256:bdef06507725b455fccba9c16529121a5e1fbf56aa375f7d9713d9e8ff42454d", size = 3183639, upload-time = "2026-03-06T13:47:47.041Z" }, + { url = "https://files.pythonhosted.org/packages/ba/95/a825894f3e45cbac7554c4e97314ce886b233a20033787eda755ca8fecc7/h5py-3.16.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:719439d14b83f74eeb080e9650a6c7aa6d0d9ea0ca7f804347b05fac6fbf18af", size = 3721663, upload-time = "2026-03-06T13:47:49.599Z" }, + { url = "https://files.pythonhosted.org/packages/bf/3b/38ff88b347c3e346cda1d3fc1b65a7aa75d40632228d8b8a5d7b58508c24/h5py-3.16.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:c3f0a0e136f2e95dd0b67146abb6668af4f1a69c81ef8651a2d316e8e01de447", size = 3087630, upload-time = "2026-03-06T13:47:51.249Z" }, + { url = "https://files.pythonhosted.org/packages/98/a8/2594cef906aee761601eff842c7dc598bea2b394a3e1c00966832b8eeb7c/h5py-3.16.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:a6fbc5367d4046801f9b7db9191b31895f22f1c6df1f9987d667854cac493538", size = 4823472, upload-time = "2026-03-06T13:47:53.085Z" }, + { url = "https://files.pythonhosted.org/packages/52/a0/c1f604538ff6db22a0690be2dc44ab59178e115f63c917794e529356ab23/h5py-3.16.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:fb1720028d99040792bb2fb31facb8da44a6f29df7697e0b84f0d79aff2e9bd3", size = 5027150, upload-time = "2026-03-06T13:47:55.043Z" }, + { url = "https://files.pythonhosted.org/packages/2e/fd/301739083c2fc4fd89950f9bcfce75d6e14b40b0ca3d40e48a8993d1722c/h5py-3.16.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:314b6054fe0b1051c2b0cb2df5cbdab15622fb05e80f202e3b6a5eee0d6fe365", size = 4814544, upload-time = "2026-03-06T13:47:56.893Z" }, + { url = "https://files.pythonhosted.org/packages/4c/42/2193ed41ccee78baba8fcc0cff2c925b8b9ee3793305b23e1f22c20bf4c7/h5py-3.16.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ffbab2fedd6581f6aa31cf1639ca2cb86e02779de525667892ebf4cc9fd26434", size = 5034013, upload-time = "2026-03-06T13:47:59.01Z" }, + { url = "https://files.pythonhosted.org/packages/f7/20/e6c0ff62ca2ad1a396a34f4380bafccaaf8791ff8fccf3d995a1fc12d417/h5py-3.16.0-cp311-cp311-win_amd64.whl", hash = "sha256:17d1f1630f92ad74494a9a7392ab25982ce2b469fc62da6074c0ce48366a2999", size = 3191673, upload-time = "2026-03-06T13:48:00.626Z" }, + { url = "https://files.pythonhosted.org/packages/f2/48/239cbe352ac4f2b8243a8e620fa1a2034635f633731493a7ff1ed71e8658/h5py-3.16.0-cp311-cp311-win_arm64.whl", hash = "sha256:85b9c49dd58dc44cf70af944784e2c2038b6f799665d0dcbbc812a26e0faa859", size = 2673834, upload-time = "2026-03-06T13:48:02.579Z" }, + { url = "https://files.pythonhosted.org/packages/c8/c0/5d4119dba94093bbafede500d3defd2f5eab7897732998c04b54021e530b/h5py-3.16.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:c5313566f4643121a78503a473f0fb1e6dcc541d5115c44f05e037609c565c4d", size = 3685604, upload-time = "2026-03-06T13:48:04.198Z" }, + { url = "https://files.pythonhosted.org/packages/b0/42/c84efcc1d4caebafb1ecd8be4643f39c85c47a80fe254d92b8b43b1eadaf/h5py-3.16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:42b012933a83e1a558c673176676a10ce2fd3759976a0fedee1e672d1e04fc9d", size = 3061940, upload-time = "2026-03-06T13:48:05.783Z" }, + { url = "https://files.pythonhosted.org/packages/89/84/06281c82d4d1686fde1ac6b0f307c50918f1c0151062445ab3b6fa5a921d/h5py-3.16.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:ff24039e2573297787c3063df64b60aab0591980ac898329a08b0320e0cf2527", size = 5198852, upload-time = "2026-03-06T13:48:07.482Z" }, + { url = "https://files.pythonhosted.org/packages/9e/e9/1a19e42cd43cc1365e127db6aae85e1c671da1d9a5d746f4d34a50edb577/h5py-3.16.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:dfc21898ff025f1e8e67e194965a95a8d4754f452f83454538f98f8a3fcb207e", size = 5405250, upload-time = "2026-03-06T13:48:09.628Z" }, + { url = "https://files.pythonhosted.org/packages/b7/8e/9790c1655eabeb85b92b1ecab7d7e62a2069e53baefd58c98f0909c7a948/h5py-3.16.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:698dd69291272642ffda44a0ecd6cd3bda5faf9621452d255f57ce91487b9794", size = 5190108, upload-time = "2026-03-06T13:48:11.26Z" }, + { url = "https://files.pythonhosted.org/packages/51/d7/ab693274f1bd7e8c5f9fdd6c7003a88d59bedeaf8752716a55f532924fbb/h5py-3.16.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2b2c02b0a160faed5fb33f1ba8a264a37ee240b22e049ecc827345d0d9043074", size = 5419216, upload-time = "2026-03-06T13:48:13.322Z" }, + { url = "https://files.pythonhosted.org/packages/03/c1/0976b235cf29ead553e22f2fb6385a8252b533715e00d0ae52ed7b900582/h5py-3.16.0-cp312-cp312-win_amd64.whl", hash = "sha256:96b422019a1c8975c2d5dadcf61d4ba6f01c31f92bbde6e4649607885fe502d6", size = 3182868, upload-time = "2026-03-06T13:48:15.759Z" }, + { url = "https://files.pythonhosted.org/packages/14/d9/866b7e570b39070f92d47b0ff1800f0f8239b6f9e45f02363d7112336c1f/h5py-3.16.0-cp312-cp312-win_arm64.whl", hash = "sha256:39c2838fb1e8d97bcf1755e60ad1f3dd76a7b2a475928dc321672752678b96db", size = 2653286, upload-time = "2026-03-06T13:48:17.279Z" }, + { url = "https://files.pythonhosted.org/packages/0f/9e/6142ebfda0cb6e9349c091eae73c2e01a770b7659255248d637bec54a88b/h5py-3.16.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:370a845f432c2c9619db8eed334d1e610c6015796122b0e57aa46312c22617d9", size = 3671808, upload-time = "2026-03-06T13:48:19.737Z" }, + { url = "https://files.pythonhosted.org/packages/b0/65/5e088a45d0f43cd814bc5bec521c051d42005a472e804b1a36c48dada09b/h5py-3.16.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:42108e93326c50c2810025aade9eac9d6827524cdccc7d4b75a546e5ab308edb", size = 3045837, upload-time = "2026-03-06T13:48:21.854Z" }, + { url = "https://files.pythonhosted.org/packages/da/1e/6172269e18cc5a484e2913ced33339aad588e02ba407fafd00d369e22ef3/h5py-3.16.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:099f2525c9dcf28de366970a5fb34879aab20491589fa89ce2863a84218bb524", size = 5193860, upload-time = "2026-03-06T13:48:24.071Z" }, + { url = "https://files.pythonhosted.org/packages/bd/98/ef2b6fe2903e377cbe870c3b2800d62552f1e3dbe81ce49e1923c53d1c5c/h5py-3.16.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:9300ad32dea9dfc5171f94d5f6948e159ed93e4701280b0f508773b3f582f402", size = 5400417, upload-time = "2026-03-06T13:48:25.728Z" }, + { url = "https://files.pythonhosted.org/packages/bc/81/5b62d760039eed64348c98129d17061fdfc7839fc9c04eaaad6dee1004e4/h5py-3.16.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:171038f23bccddfc23f344cadabdfc9917ff554db6a0d417180d2747fe4c75a7", size = 5185214, upload-time = "2026-03-06T13:48:27.436Z" }, + { url = "https://files.pythonhosted.org/packages/28/c4/532123bcd9080e250696779c927f2cb906c8bf3447df98f5ceb8dcded539/h5py-3.16.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:7e420b539fb6023a259a1b14d4c9f6df8cf50d7268f48e161169987a57b737ff", size = 5414598, upload-time = "2026-03-06T13:48:29.49Z" }, + { url = "https://files.pythonhosted.org/packages/c3/d9/a27997f84341fc0dfcdd1fe4179b6ba6c32a7aa880fdb8c514d4dad6fba3/h5py-3.16.0-cp313-cp313-win_amd64.whl", hash = "sha256:18f2bbcd545e6991412253b98727374c356d67caa920e68dc79eab36bf5fedad", size = 3175509, upload-time = "2026-03-06T13:48:31.131Z" }, + { url = "https://files.pythonhosted.org/packages/a5/23/bb8647521d4fd770c30a76cfc6cb6a2f5495868904054e92f2394c5a78ff/h5py-3.16.0-cp313-cp313-win_arm64.whl", hash = "sha256:656f00e4d903199a1d58df06b711cf3ca632b874b4207b7dbec86185b5c8c7d4", size = 2647362, upload-time = "2026-03-06T13:48:33.411Z" }, + { url = "https://files.pythonhosted.org/packages/48/3c/7fcd9b4c9eed82e91fb15568992561019ae7a829d1f696b2c844355d95dd/h5py-3.16.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:9c9d307c0ef862d1cd5714f72ecfafe0a5d7529c44845afa8de9f46e5ba8bd65", size = 3678608, upload-time = "2026-03-06T13:48:35.183Z" }, + { url = "https://files.pythonhosted.org/packages/6a/b7/9366ed44ced9b7ef357ab48c94205280276db9d7f064aa3012a97227e966/h5py-3.16.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:8c1eff849cdd53cbc73c214c30ebdb6f1bb8b64790b4b4fc36acdb5e43570210", size = 3054773, upload-time = "2026-03-06T13:48:37.139Z" }, + { url = "https://files.pythonhosted.org/packages/58/a5/4964bc0e91e86340c2bbda83420225b2f770dcf1eb8a39464871ad769436/h5py-3.16.0-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:e2c04d129f180019e216ee5f9c40b78a418634091c8782e1f723a6ca3658b965", size = 5198886, upload-time = "2026-03-06T13:48:38.879Z" }, + { url = "https://files.pythonhosted.org/packages/f1/16/d905e7f53e661ce2c24686c38048d8e2b750ffc4350009d41c4e6c6c9826/h5py-3.16.0-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:e4360f15875a532bc7b98196c7592ed4fc92672a57c0a621355961cafb17a6dd", size = 5404883, upload-time = "2026-03-06T13:48:41.324Z" }, + { url = "https://files.pythonhosted.org/packages/4b/f2/58f34cb74af46d39f4cd18ea20909a8514960c5a3e5b92fd06a28161e0a8/h5py-3.16.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:3fae9197390c325e62e0a1aa977f2f62d994aa87aab182abbea85479b791197c", size = 5192039, upload-time = "2026-03-06T13:48:43.117Z" }, + { url = "https://files.pythonhosted.org/packages/ce/ca/934a39c24ce2e2db017268c08da0537c20fa0be7e1549be3e977313fc8f5/h5py-3.16.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:43259303989ac8adacc9986695b31e35dba6fd1e297ff9c6a04b7da5542139cc", size = 5421526, upload-time = "2026-03-06T13:48:44.838Z" }, + { url = "https://files.pythonhosted.org/packages/3e/14/615a450205e1b56d16c6783f5ccd116cde05550faad70ae077c955654a75/h5py-3.16.0-cp314-cp314-win_amd64.whl", hash = "sha256:fa48993a0b799737ba7fd21e2350fa0a60701e58180fae9f2de834bc39a147ab", size = 3183263, upload-time = "2026-03-06T13:48:47.117Z" }, + { url = "https://files.pythonhosted.org/packages/7b/48/a6faef5ed632cae0c65ac6b214a6614a0b510c3183532c521bdb0055e117/h5py-3.16.0-cp314-cp314-win_arm64.whl", hash = "sha256:1897a771a7f40d05c262fc8f37376ec37873218544b70216872876c627640f63", size = 2663450, upload-time = "2026-03-06T13:48:48.707Z" }, + { url = "https://files.pythonhosted.org/packages/5d/32/0c8bb8aedb62c772cf7c1d427c7d1951477e8c2835f872bc0a13d1f85f86/h5py-3.16.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:15922e485844f77c0b9d275396d435db3baa58292a9c2176a386e072e0cf2491", size = 3760693, upload-time = "2026-03-06T13:48:50.453Z" }, + { url = "https://files.pythonhosted.org/packages/1d/1f/fcc5977d32d6387c5c9a694afee716a5e20658ac08b3ff24fdec79fb05f2/h5py-3.16.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:df02dd29bd247f98674634dfe41f89fd7c16ba3d7de8695ec958f58404a4e618", size = 3181305, upload-time = "2026-03-06T13:48:52.221Z" }, + { url = "https://files.pythonhosted.org/packages/f5/a1/af87f64b9f986889884243643621ebbd4ac72472ba8ec8cec891ac8e2ca1/h5py-3.16.0-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:0f456f556e4e2cebeebd9d66adf8dc321770a42593494a0b6f0af54a7567b242", size = 5074061, upload-time = "2026-03-06T13:48:54.089Z" }, + { url = "https://files.pythonhosted.org/packages/cc/d0/146f5eaff3dc246a9c7f6e5e4f42bd45cc613bce16693bcd4d1f7c958bf5/h5py-3.16.0-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:3e6cb3387c756de6a9492d601553dffea3fe11b5f22b443aac708c69f3f55e16", size = 5279216, upload-time = "2026-03-06T13:48:56.75Z" }, + { url = "https://files.pythonhosted.org/packages/a1/9d/12a13424f1e604fc7df9497b73c0356fb78c2fb206abd7465ce47226e8fd/h5py-3.16.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:8389e13a1fd745ad2856873e8187fd10268b2d9677877bb667b41aebd771d8b7", size = 5070068, upload-time = "2026-03-06T13:48:59.169Z" }, + { url = "https://files.pythonhosted.org/packages/41/8c/bbe98f813722b4873818a8db3e15aa3e625b59278566905ac439725e8070/h5py-3.16.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:346df559a0f7dcb31cf8e44805319e2ab24b8957c45e7708ce503b2ec79ba725", size = 5300253, upload-time = "2026-03-06T13:49:02.033Z" }, + { url = "https://files.pythonhosted.org/packages/32/9e/87e6705b4d6890e7cecdf876e2a7d3e40654a2ae37482d79a6f1b87f7b92/h5py-3.16.0-cp314-cp314t-win_amd64.whl", hash = "sha256:4c6ab014ab704b4feaa719ae783b86522ed0bf1f82184704ed3c9e4e3228796e", size = 3381671, upload-time = "2026-03-06T13:49:04.351Z" }, + { url = "https://files.pythonhosted.org/packages/96/91/9fad90cfc5f9b2489c7c26ad897157bce82f0e9534a986a221b99760b23b/h5py-3.16.0-cp314-cp314t-win_arm64.whl", hash = "sha256:faca8fb4e4319c09d83337adc80b2ca7d5c5a343c2d6f1b6388f32cfecca13c1", size = 2740706, upload-time = "2026-03-06T13:49:06.347Z" }, +] + [[package]] name = "hf-xet" version = "1.2.0" @@ -7340,16 +7408,17 @@ source = { registry = "https://pypi.org/simple" } resolution-markers = [ "python_full_version < '3.11' and platform_machine == 'x86_64' and sys_platform == 'darwin'", "python_full_version < '3.11' and platform_machine != 'x86_64' and sys_platform == 'darwin'", + "python_full_version < '3.11' and platform_machine == 'aarch64' and sys_platform == 'linux'", "python_full_version < '3.11' and platform_machine == 'x86_64' and sys_platform == 'win32'", "python_full_version < '3.11' and platform_machine != 'x86_64' and sys_platform == 'win32'", "python_full_version < '3.11' and platform_machine == 'x86_64' and sys_platform != 'darwin' and sys_platform != 'win32'", "(python_full_version < '3.11' and platform_machine != 'aarch64' and platform_machine != 'x86_64' and sys_platform == 'linux') or (python_full_version < '3.11' and platform_machine != 'x86_64' and sys_platform != 'darwin' and sys_platform != 'linux' and sys_platform != 'win32')", ] dependencies = [ - { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version < '3.11' and platform_machine != 'aarch64') or (python_full_version < '3.11' and sys_platform != 'linux')" }, - { name = "python-dateutil", marker = "(python_full_version < '3.11' and platform_machine != 'aarch64') or (python_full_version < '3.11' and sys_platform != 'linux')" }, - { name = "pytz", marker = "(python_full_version < '3.11' and platform_machine != 'aarch64') or (python_full_version < '3.11' and sys_platform != 'linux')" }, - { name = "tzdata", marker = "(python_full_version < '3.11' and platform_machine != 'aarch64') or (python_full_version < '3.11' and sys_platform != 'linux')" }, + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "python-dateutil", marker = "python_full_version < '3.11'" }, + { name = "pytz", marker = "python_full_version < '3.11'" }, + { name = "tzdata", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/33/01/d40b85317f86cf08d853a4f495195c73815fdf205eef3993821720274518/pandas-2.3.3.tar.gz", hash = "sha256:e05e1af93b977f7eafa636d043f9f94c7ee3ac81af99c13508215942e64c993b", size = 4495223, upload-time = "2025-09-29T23:34:51.853Z" } wheels = [ @@ -7413,6 +7482,9 @@ resolution-markers = [ "python_full_version == '3.13.*' and platform_machine != 'x86_64' and sys_platform == 'darwin'", "python_full_version == '3.12.*' and platform_machine == 'x86_64' and sys_platform == 'darwin'", "python_full_version == '3.12.*' and platform_machine != 'x86_64' and sys_platform == 'darwin'", + "python_full_version >= '3.14' and platform_machine == 'aarch64' and sys_platform == 'linux'", + "python_full_version == '3.13.*' and platform_machine == 'aarch64' and sys_platform == 'linux'", + "python_full_version == '3.12.*' and platform_machine == 'aarch64' and sys_platform == 'linux'", "python_full_version >= '3.14' and platform_machine == 'x86_64' and sys_platform == 'win32'", "python_full_version >= '3.14' and platform_machine != 'x86_64' and sys_platform == 'win32'", "python_full_version == '3.13.*' and platform_machine == 'x86_64' and sys_platform == 'win32'", @@ -7427,14 +7499,15 @@ resolution-markers = [ "(python_full_version == '3.12.*' and platform_machine != 'aarch64' and platform_machine != 'x86_64' and sys_platform == 'linux') or (python_full_version == '3.12.*' and platform_machine != 'x86_64' and sys_platform != 'darwin' and sys_platform != 'linux' and sys_platform != 'win32')", "python_full_version == '3.11.*' and platform_machine == 'x86_64' and sys_platform == 'darwin'", "python_full_version == '3.11.*' and platform_machine != 'x86_64' and sys_platform == 'darwin'", + "python_full_version == '3.11.*' and platform_machine == 'aarch64' and sys_platform == 'linux'", "python_full_version == '3.11.*' and platform_machine == 'x86_64' and sys_platform == 'win32'", "python_full_version == '3.11.*' and platform_machine != 'x86_64' and sys_platform == 'win32'", "python_full_version == '3.11.*' and platform_machine == 'x86_64' and sys_platform != 'darwin' and sys_platform != 'win32'", "(python_full_version == '3.11.*' and platform_machine != 'aarch64' and platform_machine != 'x86_64' and sys_platform == 'linux') or (python_full_version == '3.11.*' and platform_machine != 'x86_64' and sys_platform != 'darwin' and sys_platform != 'linux' and sys_platform != 'win32')", ] dependencies = [ - { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.11' and platform_machine != 'aarch64') or (python_full_version >= '3.11' and sys_platform != 'linux')" }, - { name = "python-dateutil", marker = "(python_full_version >= '3.11' and platform_machine != 'aarch64') or (python_full_version >= '3.11' and sys_platform != 'linux')" }, + { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "python-dateutil", marker = "python_full_version >= '3.11'" }, { name = "tzdata", marker = "(python_full_version >= '3.11' and sys_platform == 'emscripten') or (python_full_version >= '3.11' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/de/da/b1dc0481ab8d55d0f46e343cfe67d4551a0e14fcee52bd38ca1bd73258d8/pandas-3.0.0.tar.gz", hash = "sha256:0facf7e87d38f721f0af46fe70d97373a37701b1c09f7ed7aeeb292ade5c050f", size = 4633005, upload-time = "2026-01-21T15:52:04.726Z" }