Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
ab7f7fe
initial commit: dataprep step spec
ruthwikdasyam Apr 27, 2026
7c9f535
temp: learning spec files
ruthwikdasyam Apr 30, 2026
a10dce1
temp spec update
ruthwikdasyam May 1, 2026
779d100
learning pipeline spec
ruthwikdasyam May 1, 2026
20fcb4b
Merge branch 'dev' into ruthwik/learning/1
ruthwikdasyam May 4, 2026
0cdbe55
tested commit slop
ruthwikdasyam May 5, 2026
c28d90e
Merge branch 'main' into ruthwik/learning/1
ruthwikdasyam May 29, 2026
353d0b4
feat: xarm7 inference
ruthwikdasyam May 29, 2026
3382826
Merge remote-tracking branch 'origin/main' into ruthwik/datacollection
ruthwikdasyam Jun 4, 2026
fd0c05a
remove training and inference codes
ruthwikdasyam Jun 4, 2026
1630d6b
Merge branch 'main' into ruthwik/datacollection
ruthwikdasyam Jun 4, 2026
95fa239
docs: remove readme
ruthwikdasyam Jun 4, 2026
475565b
feat: dataprep folder
ruthwikdasyam Jun 4, 2026
38d434c
feat: add recorder
ruthwikdasyam Jun 4, 2026
b39ceaf
fix: ore-commit
ruthwikdasyam Jun 4, 2026
6a48475
fix: episodeextractor default
ruthwikdasyam Jun 4, 2026
6fc0cff
refactor: dimos dataprep subcommand with build and inspect
ruthwikdasyam Jun 10, 2026
45bebf6
fix: pre-commit fixes
ruthwikdasyam Jun 10, 2026
6505837
feat: live logs of episode status
ruthwikdasyam Jun 15, 2026
76f6719
fix: dataprep status_stream default, rgb→bgr, drop button recording
ruthwikdasyam Jun 16, 2026
06b1c8a
fix: pre-commit checks
ruthwikdasyam Jun 16, 2026
feb93c6
feat: dataprep action-shift + collection status log, fixes
ruthwikdasyam Jun 17, 2026
8b0da13
fix: db path + cam sim support
ruthwikdasyam Jun 17, 2026
d422708
session_db file name with datetime
ruthwikdasyam Jun 17, 2026
d10b955
fix: episode toggle button
ruthwikdasyam Jun 18, 2026
66a31d6
fix: dataprep float32 + lerobot timestamp/stats fixes
ruthwikdasyam Jun 18, 2026
750b085
hey jeff, in the rui interview meeting. just here to see if anyone joins
ruthwikdasyam Jun 18, 2026
a7fc7a7
feat: tests
ruthwikdasyam Jun 18, 2026
e1d0134
[autofix.ci] apply automated fixes
autofix-ci[bot] Jun 18, 2026
819febd
revert: jpeg debug
ruthwikdasyam Jun 18, 2026
b3e8d82
fix: mypy issues
ruthwikdasyam Jun 18, 2026
b3f234f
misc: test fixes + module list
ruthwikdasyam Jun 18, 2026
77bbfa6
Merge branch 'main' into ruthwik/datacollection
ruthwikdasyam Jun 18, 2026
b4e84ab
fix: greptile comments
ruthwikdasyam Jun 18, 2026
621289a
fix: dataprep fps sync + episode index/leak/lock fixes, monitor tests
ruthwikdasyam Jun 18, 2026
0b30ef1
fix: add blueprints to self hosted list
ruthwikdasyam Jun 18, 2026
2e61377
fix: redundant transport descriptions
ruthwikdasyam Jun 18, 2026
da17bea
feat: questaliases
ruthwikdasyam Jun 18, 2026
d006d62
refactor: source-stamp EpisodeStatus.ts, drop redundant start_ts
ruthwikdasyam Jun 19, 2026
1a71102
misc: todo for later
ruthwikdasyam Jun 19, 2026
8eca873
writer and inspector format validate
ruthwikdasyam Jun 19, 2026
97c9726
misc: simplification nearest check
ruthwikdasyam Jun 19, 2026
28f85ab
misc: comments instructions
ruthwikdasyam Jun 19, 2026
c69d4dc
fix: None retun for tests
ruthwikdasyam Jun 19, 2026
a1497dc
feat: lerobot v3.0
ruthwikdasyam Jun 19, 2026
6cf9c7e
[autofix.ci] apply automated fixes
autofix-ci[bot] Jun 19, 2026
01c1a57
fix: greptile issues
ruthwikdasyam Jun 19, 2026
d1f8916
fix: address greptile review — writer resource guard + per-episode ta…
ruthwikdasyam Jun 20, 2026
d542767
fix(dataprep): reject shared obs/action feature keys instead of silen…
ruthwikdasyam Jun 20, 2026
c9a8c05
Merge branch 'main' into ruthwik/datacollection
ruthwikdasyam Jun 20, 2026
132ac71
test(learning): drop __new__ shell for mocker-patched construction; h…
ruthwikdasyam Jun 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions dimos/learning/collection/blueprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Recording blueprints.

`CollectionRecorder` (a memory2 Recorder) captures the obs/action/status
streams to a SQLite session DB during the run and flushes it durably on
shutdown. DataPrep reads that DB afterwards.
"""

from __future__ import annotations

from datetime import datetime

from dimos.core.coordination.blueprints import Blueprint, autoconnect
from dimos.core.global_config import global_config
from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera
from dimos.learning.collection.episode_monitor import EpisodeMonitorModule
from dimos.learning.collection.recorder import CollectionRecorder
from dimos.teleop.quest.blueprints import (
teleop_quest_piper,
teleop_quest_xarm7,
)


def _session_db(robot: str) -> str:
"""Timestamped session DB path, namespaced by robot."""
return f"data/recordings/session_{robot}_{datetime.now():%Y%m%d_%H%M%S}.db"


def _camera_if_real() -> tuple[Blueprint, ...]:
"""Real RealSense only off-sim. In `--simulation` the teleop coordinator's
MujocoSimModule already publishes color_image on /camera/color_image, so a
real camera would be redundant (and fail with no device connected)."""
if global_config.simulation:
return ()
return (RealSenseCamera.blueprint(enable_pointcloud=False),)


# buttons / color_image / joint_state / status are left to autoconnect — each
# name is unique across the composed blueprint, so it resolves to a stable
# /<name> topic shared by producer and recorder.
learning_collect_quest_xarm7 = autoconnect(
teleop_quest_xarm7,
*_camera_if_real(),
EpisodeMonitorModule.blueprint(), # default button_map: toggle=B, discard=Y
CollectionRecorder.blueprint(db_path=_session_db("xarm7")),
)


learning_collect_quest_piper = autoconnect(
teleop_quest_piper,
*_camera_if_real(),
EpisodeMonitorModule.blueprint(), # default button_map: toggle=B, discard=Y
CollectionRecorder.blueprint(db_path=_session_db("piper")),
)


__all__ = [
"learning_collect_quest_piper",
"learning_collect_quest_xarm7",
]
215 changes: 215 additions & 0 deletions dimos/learning/collection/episode_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Single point of teleop-input → EpisodeStatus translation.

Watches buttons / keyboard, runs the start/save/discard state machine,
publishes EpisodeStatus on every transition. RecordReplay (or whatever
records the bus) captures that stream into session.db; DataPrep reads
only the recorded EpisodeStatus events offline — never raw buttons or
keypresses.
"""

from __future__ import annotations

import threading
import time
from typing import Any, Literal

from pydantic import BaseModel
from reactivex.disposable import Disposable

from dimos.core.core import rpc
from dimos.core.module import Module, ModuleConfig
from dimos.core.stream import In, Out
from dimos.teleop.quest.quest_types import BUTTON_ALIASES, Buttons
from dimos.utils.logging_config import setup_logger

logger = setup_logger()


class EpisodeStatus(BaseModel):
ts: float
state: Literal["idle", "recording"]
episodes_saved: int
episodes_discarded: int
last_event: Literal["start", "save", "discard", "init"] = "init"
Comment on lines +45 to +47

@mustafab0 mustafab0 Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little confused here. Why is the EpisodeStatus counting episodes_saved/discarded ?

A single session would have multiple episodes inside it

@ruthwikdasyam ruthwikdasyam Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. once we start a blueprint, we collect multiple episodes.
when we keep collecting episodes.. This msg will be live indication of how many we collected, and how many discarded.

task_label: str | None = None


class KeyPress(BaseModel):
"""Single keypress event from a keyboard input source."""

key: str
ts: float


class EpisodeMonitorModuleConfig(ModuleConfig):
button_map: dict[Literal["start", "save", "discard", "toggle"], str] = {
"toggle": "B",
"discard": "Y",
}
keyboard_map: dict[Literal["start", "save", "discard", "toggle"], str] = {}
default_task_label: str | None = None


class EpisodeMonitorModule(Module):
config: EpisodeMonitorModuleConfig

buttons: In[Buttons]
# TODO: no KeyPress producer exists yet — add a pygame keyboard module that
# publishes KeyPress so this port is actually fed (today only buttons drive it).
keyboard: In[KeyPress]
status: Out[EpisodeStatus]

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._state: Literal["idle", "recording"] = "idle"
self._saved: int = 0
self._discarded: int = 0
self._last_event: Literal["start", "save", "discard", "init"] = "init"
self._prev_bits: dict[str, bool] = {} # rising-edge detection for buttons
self._lock = threading.Lock()

@rpc
def start(self) -> None:
super().start()
# Registered so the base Module.stop() disposes them on shutdown.
self.register_disposable(Disposable(self.buttons.subscribe(self._on_buttons)))
self.register_disposable(Disposable(self.keyboard.subscribe(self._on_keyboard)))
# Emit an initial idle status so subscribers (and recorders) have a
# known starting point in the timeline.
with self._lock:
status = self._snapshot("init", time.time())
self._emit(status)

@rpc
def stop(self) -> None:
super().stop()

@rpc
def reset_counters(self) -> EpisodeStatus:
with self._lock:
self._state = "idle"
self._saved = 0
self._discarded = 0
self._prev_bits = {}
status = self._snapshot("init", time.time())
return self._emit(status)

@rpc
def get_status(self) -> EpisodeStatus:
with self._lock:
return EpisodeStatus(
ts=time.time(),
state=self._state,
episodes_saved=self._saved,
episodes_discarded=self._discarded,
last_event=self._last_event,
task_label=self.config.default_task_label,
)

# ── port handlers ────────────────────────────────────────────────────────

def _on_buttons(self, msg: Buttons) -> None:
"""Rising-edge detect against `config.button_map`; advance state machine."""
ts = time.time()
# Edge-detect under the lock (it shares `_prev_bits` with reset_counters),
# then fire transitions outside it — `_transition` takes the same lock.
fired: list[Literal["start", "save", "discard", "toggle"]] = []
with self._lock:
for event_name, alias_or_attr in self.config.button_map.items():
attr = BUTTON_ALIASES.get(alias_or_attr, alias_or_attr)
try:
pressed = bool(getattr(msg, attr))
except AttributeError:
continue
prev = self._prev_bits.get(attr, False)
self._prev_bits[attr] = pressed
if pressed and not prev: # rising edge
fired.append(event_name)
for event_name in fired:
self._transition(event_name, ts)

def _on_keyboard(self, msg: KeyPress) -> None:
"""Match `msg.key` against `config.keyboard_map`; advance state machine."""
for event_name, key in self.config.keyboard_map.items():
if msg.key == key:
self._transition(event_name, msg.ts)
break

def _transition(self, event: Literal["start", "save", "discard", "toggle"], ts: float) -> None:
"""State-machine transition. Publishes EpisodeStatus on every change.

``toggle`` resolves to ``start`` when idle and ``save`` when recording,
so one button can begin and end a take. The resolved event is what gets
published (DataPrep only ever sees start/save/discard).
"""
with self._lock:
if event == "toggle":
event = "save" if self._state == "recording" else "start"
if event == "start":
# Auto-commit any in-progress episode (matches DataPrep extractor).
if self._state == "recording":
self._saved += 1
self._state = "recording"
elif event == "save":
if self._state == "recording":
self._saved += 1
self._state = "idle"
elif event == "discard":
if self._state == "recording":
self._discarded += 1
self._state = "idle"
# Snapshot under the mutation's lock so the event matches the state.
status = self._snapshot(event, ts)
self._emit(status)

def _snapshot(
self, last_event: Literal["start", "save", "discard", "init"], ts: float
) -> EpisodeStatus:
"""Build a status from current state. Caller must hold `self._lock`."""
self._last_event = last_event
return EpisodeStatus(
ts=ts,
state=self._state,
episodes_saved=self._saved,
episodes_discarded=self._discarded,
last_event=last_event,
task_label=self.config.default_task_label,
)

def _emit(self, status: EpisodeStatus) -> EpisodeStatus:
"""Publish + log a snapshot. Must run outside the lock (does I/O)."""
self.status.publish(status)
self._log_status(status)
return status

def _log_status(self, status: EpisodeStatus) -> None:
"""One-line operator feedback to the terminal on every transition."""
verb = {
"start": "▶ RECORDING episode",
"save": "✓ SAVED episode",
"discard": "✗ DISCARDED episode",
"init": "· ready",
}.get(status.last_event, status.last_event)
label = f" [{status.task_label}]" if status.task_label else ""
logger.info(
"[collect] %s%s (state=%s saved=%d discarded=%d)",
verb,
label,
status.state,
status.episodes_saved,
status.episodes_discarded,
)
51 changes: 51 additions & 0 deletions dimos/learning/collection/recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""CollectionRecorder — captures teleop collection streams to a memory2 DB.

A `Recorder` (memory2) subscribes each declared `In` port and appends every
message to a SQLite store, flushing durably on stop(). Only *connected*
streams are recorded, so the same recorder works for any arm whose
coordinator publishes `joint_state`.

The recorded stream names match what DataPrep reads: `color_image`
and `joint_state` (observation), `status` (episode segmentation).
"""

from __future__ import annotations

from pathlib import Path

from dimos.core.stream import In
from dimos.learning.collection.episode_monitor import EpisodeStatus
from dimos.memory2.module import Recorder, RecorderConfig
from dimos.msgs.sensor_msgs.Image import Image
from dimos.msgs.sensor_msgs.JointState import JointState


class CollectionRecorderConfig(RecorderConfig):
db_path: str | Path = "data/recordings/session.db"


class CollectionRecorder(Recorder):
"""Records the streams DataPrep consumes from a teleop session."""

config: CollectionRecorderConfig

color_image: In[Image] # observation (camera)
joint_state: In[JointState] # observation + action (measured/next state)
status: In[EpisodeStatus] # episode start/save/discard segmentation


__all__ = ["CollectionRecorder", "CollectionRecorderConfig"]
Loading
Loading