Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Machine-readable interface for AI assistants and automation scripts.
| `PLEXUS_API_KEY` | API key for authentication | `plx_xxxxx` |
| `PLEXUS_ENDPOINT` | Server URL (default: `https://app.plexus.company`) | `https://custom.endpoint.com` |
| `PLEXUS_ORG_ID` | Organization ID | `org_xxxxx` |
| `PLEXUS_WS_URL` | Gateway WebSocket URL (overrides API discovery) | `ws://localhost:8080` |

## CLI Commands

Expand Down
3 changes: 3 additions & 0 deletions plexus/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class Metric:
timestamp: Unix timestamp (seconds). If None, current time is used.
tags: Optional key-value metadata
source_id: Optional source identifier
data_class: Pipeline data class - "metric" (default), "event", or "blob"

Examples:
Metric("temperature", 72.5)
Expand All @@ -115,6 +116,7 @@ class Metric:
timestamp: Optional[float] = None
tags: Optional[Dict[str, str]] = None
source_id: Optional[str] = None
data_class: str = "metric"

def __post_init__(self):
if self.timestamp is None:
Expand All @@ -123,6 +125,7 @@ def __post_init__(self):
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API submission"""
result = {
"class": self.data_class,
"metric": self.name,
"value": self.value,
"timestamp": self.timestamp,
Expand Down
9 changes: 6 additions & 3 deletions plexus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def _make_point(
value: FlexValue,
timestamp: Optional[float] = None,
tags: Optional[Dict[str, str]] = None,
data_class: str = "metric",
) -> Dict[str, Any]:
"""Create a data point dictionary.

Expand All @@ -173,6 +174,7 @@ def _make_point(
- list: Arrays, coordinates, multi-value readings
"""
point = {
"class": data_class,
"metric": metric,
"value": value,
"timestamp": self._normalize_ts_ms(timestamp),
Expand All @@ -189,6 +191,7 @@ def send(
value: FlexValue,
timestamp: Optional[float] = None,
tags: Optional[Dict[str, str]] = None,
data_class: str = "metric",
) -> bool:
"""
Send a single metric value to Plexus.
Expand All @@ -203,6 +206,7 @@ def send(
- list: px.send("angles", [0.5, 1.2, -0.3])
timestamp: Unix timestamp. If not provided, uses current time.
tags: Optional key-value tags for the metric
data_class: Pipeline data class - "metric" (default) or "event"

Returns:
True if successful
Expand All @@ -214,10 +218,9 @@ def send(
Example:
px.send("temperature", 72.5)
px.send("motor.rpm", 3450, tags={"motor_id": "A1"})
px.send("robot.state", "IDLE")
px.send("position", {"x": 1.5, "y": 2.3, "z": 0.0})
px.send("gps.status", {"fix": "lost"}, data_class="event")
"""
point = self._make_point(metric, value, timestamp, tags)
point = self._make_point(metric, value, timestamp, tags, data_class)
return self._send_points([point])

def send_batch(
Expand Down
7 changes: 3 additions & 4 deletions plexus/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def __init__(
can_adapters=can_adapters,
mavlink_connections=mavlink_connections,
on_status=self.on_status,
persist_fn=self._persist_async,
error_report_fn=self.report_error,
buffer=self._buffer,
endpoint=self.endpoint,
Expand Down Expand Up @@ -215,14 +214,14 @@ def _get_local_ip() -> str:
# =========================================================================

def _get_ws_url(self) -> str:
"""Get PartyKit WebSocket URL."""
"""Get WebSocket URL for gateway or PartyKit."""
# 1. Explicit env var
ws_endpoint = os.environ.get("PLEXUS_WS_URL")
if ws_endpoint:
base = ws_endpoint.rstrip("/")
if "/party/" in base:
if "/ws/" in base or "/party/" in base:
return base
return f"{base}/party/{self.org_id}"
return f"{base}/ws/device"

# 2. Discover from API
try:
Expand Down
128 changes: 84 additions & 44 deletions plexus/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import threading
import time
import uuid
from io import BytesIO
from typing import Optional, Callable, List, Dict, Any, TYPE_CHECKING

Expand Down Expand Up @@ -63,11 +64,13 @@ def _do_upload():
class StreamManager:
"""Manages sensor and camera streams.

All telemetry is relayed over WebSocket. Persistence is a gateway/
consumer concern — the agent has no per-stream "record" state.

Args:
sensor_hub: SensorHub instance for reading sensors.
camera_hub: CameraHub instance for capturing frames.
on_status: Callback for status messages.
persist_fn: Async function to persist data points (called when recording).
buffer: Optional buffer backend for store-and-forward. When provided,
telemetry that fails to send over WebSocket is buffered locally
instead of being lost.
Expand All @@ -80,7 +83,6 @@ def __init__(
can_adapters: Optional[List["DetectedCAN"]] = None,
mavlink_connections: Optional[List["DetectedMAVLink"]] = None,
on_status: Optional[Callable[[str], None]] = None,
persist_fn: Optional[Callable[[List[Dict[str, Any]]], Any]] = None,
error_report_fn: Optional[Callable] = None,
buffer: Optional["BufferBackend"] = None,
store_frames: bool = False,
Expand All @@ -95,7 +97,6 @@ def __init__(
self.can_adapters = can_adapters or []
self.mavlink_connections = mavlink_connections or []
self.on_status = on_status or (lambda x: None)
self.persist_fn = persist_fn
self.error_report_fn = error_report_fn
self.buffer = buffer
self.store_frames = store_frames
Expand All @@ -111,7 +112,6 @@ def __init__(
self._can_instances: Dict[str, Any] = {} # channel -> CANAdapter
self._active_mavlink_streams: Dict[str, asyncio.Task] = {}
self._mavlink_instances: Dict[str, Any] = {} # conn_string -> MAVLinkAdapter
self._recording: bool = False

# =========================================================================
# WebSocket Send with Buffer Fallback
Expand All @@ -120,10 +120,21 @@ def __init__(
async def _send_or_buffer(self, ws, points: List[Dict[str, Any]]) -> bool:
"""Send telemetry over WebSocket, falling back to buffer on failure.

Wraps points in the pipeline envelope with version, trace_id, and
source identifiers for gateway routing.

Returns True if sent over WebSocket, False if buffered.
"""
try:
await ws.send(json.dumps({"type": "telemetry", "points": points}))
envelope = {
"type": "telemetry",
"v": 1,
"trace_id": uuid.uuid4().hex,
"source_id": self._source_id,
"points": points,
"ingested_at": int(time.time() * 1000),
}
await ws.send(json.dumps(envelope))
return True
except (ConnectionClosed, ConnectionError, OSError):
if self.buffer and points:
Expand All @@ -138,51 +149,93 @@ async def _send_or_buffer(self, ws, points: List[Dict[str, Any]]) -> bool:
async def start_stream(self, data: dict, ws):
"""Start streaming sensor data.

Each sensor is sampled at its own declared `sample_rate` (Hz).
The loop ticks at the fastest sensor's rate and reads each
sensor only when its interval is up. Sensors with different
rates coexist correctly — a 100Hz IMU and a 1Hz BME280 in the
same hub each run at their natural cadence.

Args (from dashboard):
store: bool - If True, persist to ClickHouse. If False, real-time only.
metrics: list - Which metrics to stream (empty = all)
interval_ms: int - Sampling interval
interval_ms: int - Optional global rate cap in ms. When set,
no sensor samples faster than 1000/interval_ms Hz. When
omitted, sensors use their declared rates.
"""
stream_id = data.get("id", f"stream_{int(time.time())}")
metrics = data.get("metrics", [])
interval_ms = data.get("interval_ms", 100)
store = data.get("store", False)

self._recording = store
interval_ms = data.get("interval_ms") # optional global cap

if not self.sensor_hub:
self.on_status("No sensors configured")
return

mode = "Recording" if store else "Streaming"
sensors = list(self.sensor_hub.sensors)
if not sensors:
self.on_status("No sensors available")
return

# Apply optional global rate cap from the dashboard.
# Each sensor samples at min(sensor.sample_rate, cap_hz).
cap_hz = None
if interval_ms and interval_ms > 0:
cap_hz = 1000.0 / interval_ms

def effective_rate(s) -> float:
rate = getattr(s, "sample_rate", 10.0) or 10.0
if cap_hz is not None:
rate = min(rate, cap_hz)
return max(rate, 0.01) # floor at 0.01Hz (one read per 100s)

# Loop tick = fastest sensor's interval
max_rate = max(effective_rate(s) for s in sensors)
tick = 1.0 / max_rate

metric_count = len(metrics) if metrics else "all"
self.on_status(f"{mode} {metric_count} metrics @ {interval_ms}ms")
cap_note = f" cap={cap_hz:.1f}Hz" if cap_hz is not None else ""
self.on_status(f"Streaming {metric_count} metrics (tick {tick*1000:.0f}ms{cap_note})")

async def stream_loop():
# Parse metric filters (strip source_id prefix if present)
filters = set()
for m in metrics:
filters.add(m.split(":", 1)[-1] if ":" in m else m)

# Per-sensor last-read timestamps
last_read = {id(s): 0.0 for s in sensors}

try:
while stream_id in self._active_streams:
readings = self.sensor_hub.read_all()
now = time.time()
readings = []

for sensor in sensors:
if getattr(sensor, "_disabled", False):
continue
interval = 1.0 / effective_rate(sensor)
if now - last_read[id(sensor)] >= interval:
try:
readings.extend(sensor.read())
except Exception as e:
logger.debug(f"Sensor read failed: {sensor.name}: {e}")
continue
last_read[id(sensor)] = now

if filters:
readings = [r for r in readings if r.metric in filters]

points = [
{"metric": r.metric, "value": r.value, "timestamp": int(time.time() * 1000)}
for r in readings
]

# Send to WebSocket (real-time), buffer on failure
await self._send_or_buffer(ws, points)

# If recording, also persist via HTTP
if self._recording and points and self.persist_fn:
asyncio.create_task(self.persist_fn(points))
if readings:
points = [
{
"class": "metric",
"metric": r.metric,
"value": r.value,
"timestamp": int(time.time() * 1000),
}
for r in readings
]
await self._send_or_buffer(ws, points)

await asyncio.sleep(interval_ms / 1000)
await asyncio.sleep(tick)
except asyncio.CancelledError:
pass
except Exception as e:
Expand All @@ -208,8 +261,6 @@ async def stop_stream(self, data: dict):
del self._active_streams[stream_id]
self.on_status("Stopped stream")

self._recording = False

async def configure_sensor(self, data: dict):
"""Configure a sensor's runtime parameters.

Expand Down Expand Up @@ -289,6 +340,9 @@ async def camera_loop():
if frame:
await ws.send(json.dumps({
"type": "video_frame",
"v": 1,
"trace_id": uuid.uuid4().hex,
"source_id": self._source_id,
"camera_id": camera_id,
"frame": base64.b64encode(frame.data).decode('ascii'),
"width": frame.width,
Expand Down Expand Up @@ -399,10 +453,6 @@ async def start_can_stream(self, data: dict, ws):

dbc_path = data.get("dbc_path")
interval_ms = data.get("interval_ms", 10)
store = data.get("store", False)

if store:
self._recording = True

try:
from plexus.adapters.can import CANAdapter
Expand Down Expand Up @@ -434,6 +484,7 @@ async def can_loop():
if metrics:
points = [
{
"class": m.data_class,
"metric": m.name,
"value": m.value,
"timestamp": int((m.timestamp or time.time()) * 1000),
Expand All @@ -443,9 +494,6 @@ async def can_loop():
]
await self._send_or_buffer(ws, points)

if self._recording and self.persist_fn:
asyncio.create_task(self.persist_fn(points))

await asyncio.sleep(interval_ms / 1000)
except asyncio.CancelledError:
pass
Expand Down Expand Up @@ -516,12 +564,8 @@ async def start_mavlink_stream(self, data: dict, ws):
self._cleanup_mavlink_instance(conn_str)

interval_ms = data.get("interval_ms", 10)
store = data.get("store", False)
include_messages = data.get("include_messages")

if store:
self._recording = True

try:
from plexus.adapters.mavlink import MAVLinkAdapter

Expand Down Expand Up @@ -549,6 +593,7 @@ async def mavlink_loop():
if metrics:
points = [
{
"class": m.data_class,
"metric": m.name,
"value": m.value,
"timestamp": int((m.timestamp or time.time()) * 1000),
Expand All @@ -558,9 +603,6 @@ async def mavlink_loop():
]
await self._send_or_buffer(ws, points)

if self._recording and self.persist_fn:
asyncio.create_task(self.persist_fn(points))

await asyncio.sleep(interval_ms / 1000)
except asyncio.CancelledError:
pass
Expand Down Expand Up @@ -608,8 +650,6 @@ def _cleanup_mavlink_instance(self, conn_str: str):

def cancel_all(self):
"""Cancel all active streams."""
self._recording = False

for task in self._active_streams.values():
task.cancel()
self._active_streams.clear()
Expand Down
Loading