diff --git a/AGENTS.md b/AGENTS.md index 0b578de..b13b0cc 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -9,6 +9,7 @@ Machine-readable interface for AI assistants and automation scripts. | `PLEXUS_API_KEY` | API key for authentication | `plx_xxxxx` | | `PLEXUS_ENDPOINT` | Server URL (default: `https://app.plexus.company`) | `https://custom.endpoint.com` | | `PLEXUS_ORG_ID` | Organization ID | `org_xxxxx` | +| `PLEXUS_WS_URL` | Gateway WebSocket URL (overrides API discovery) | `ws://localhost:8080` | ## CLI Commands diff --git a/plexus/adapters/base.py b/plexus/adapters/base.py index de8f4ab..1251b14 100644 --- a/plexus/adapters/base.py +++ b/plexus/adapters/base.py @@ -103,6 +103,7 @@ class Metric: timestamp: Unix timestamp (seconds). If None, current time is used. tags: Optional key-value metadata source_id: Optional source identifier + data_class: Pipeline data class - "metric" (default), "event", or "blob" Examples: Metric("temperature", 72.5) @@ -115,6 +116,7 @@ class Metric: timestamp: Optional[float] = None tags: Optional[Dict[str, str]] = None source_id: Optional[str] = None + data_class: str = "metric" def __post_init__(self): if self.timestamp is None: @@ -123,6 +125,7 @@ def __post_init__(self): def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for API submission""" result = { + "class": self.data_class, "metric": self.name, "value": self.value, "timestamp": self.timestamp, diff --git a/plexus/client.py b/plexus/client.py index d0dff72..46b5383 100644 --- a/plexus/client.py +++ b/plexus/client.py @@ -162,6 +162,7 @@ def _make_point( value: FlexValue, timestamp: Optional[float] = None, tags: Optional[Dict[str, str]] = None, + data_class: str = "metric", ) -> Dict[str, Any]: """Create a data point dictionary. @@ -173,6 +174,7 @@ def _make_point( - list: Arrays, coordinates, multi-value readings """ point = { + "class": data_class, "metric": metric, "value": value, "timestamp": self._normalize_ts_ms(timestamp), @@ -189,6 +191,7 @@ def send( value: FlexValue, timestamp: Optional[float] = None, tags: Optional[Dict[str, str]] = None, + data_class: str = "metric", ) -> bool: """ Send a single metric value to Plexus. @@ -203,6 +206,7 @@ def send( - list: px.send("angles", [0.5, 1.2, -0.3]) timestamp: Unix timestamp. If not provided, uses current time. tags: Optional key-value tags for the metric + data_class: Pipeline data class - "metric" (default) or "event" Returns: True if successful @@ -214,10 +218,9 @@ def send( Example: px.send("temperature", 72.5) px.send("motor.rpm", 3450, tags={"motor_id": "A1"}) - px.send("robot.state", "IDLE") - px.send("position", {"x": 1.5, "y": 2.3, "z": 0.0}) + px.send("gps.status", {"fix": "lost"}, data_class="event") """ - point = self._make_point(metric, value, timestamp, tags) + point = self._make_point(metric, value, timestamp, tags, data_class) return self._send_points([point]) def send_batch( diff --git a/plexus/connector.py b/plexus/connector.py index 1d05844..c166ba2 100644 --- a/plexus/connector.py +++ b/plexus/connector.py @@ -125,7 +125,6 @@ def __init__( can_adapters=can_adapters, mavlink_connections=mavlink_connections, on_status=self.on_status, - persist_fn=self._persist_async, error_report_fn=self.report_error, buffer=self._buffer, endpoint=self.endpoint, @@ -215,14 +214,14 @@ def _get_local_ip() -> str: # ========================================================================= def _get_ws_url(self) -> str: - """Get PartyKit WebSocket URL.""" + """Get WebSocket URL for gateway or PartyKit.""" # 1. Explicit env var ws_endpoint = os.environ.get("PLEXUS_WS_URL") if ws_endpoint: base = ws_endpoint.rstrip("/") - if "/party/" in base: + if "/ws/" in base or "/party/" in base: return base - return f"{base}/party/{self.org_id}" + return f"{base}/ws/device" # 2. Discover from API try: diff --git a/plexus/streaming.py b/plexus/streaming.py index 1ef6f1e..1e08e35 100644 --- a/plexus/streaming.py +++ b/plexus/streaming.py @@ -15,6 +15,7 @@ import logging import threading import time +import uuid from io import BytesIO from typing import Optional, Callable, List, Dict, Any, TYPE_CHECKING @@ -63,11 +64,13 @@ def _do_upload(): class StreamManager: """Manages sensor and camera streams. + All telemetry is relayed over WebSocket. Persistence is a gateway/ + consumer concern — the agent has no per-stream "record" state. + Args: sensor_hub: SensorHub instance for reading sensors. camera_hub: CameraHub instance for capturing frames. on_status: Callback for status messages. - persist_fn: Async function to persist data points (called when recording). buffer: Optional buffer backend for store-and-forward. When provided, telemetry that fails to send over WebSocket is buffered locally instead of being lost. @@ -80,7 +83,6 @@ def __init__( can_adapters: Optional[List["DetectedCAN"]] = None, mavlink_connections: Optional[List["DetectedMAVLink"]] = None, on_status: Optional[Callable[[str], None]] = None, - persist_fn: Optional[Callable[[List[Dict[str, Any]]], Any]] = None, error_report_fn: Optional[Callable] = None, buffer: Optional["BufferBackend"] = None, store_frames: bool = False, @@ -95,7 +97,6 @@ def __init__( self.can_adapters = can_adapters or [] self.mavlink_connections = mavlink_connections or [] self.on_status = on_status or (lambda x: None) - self.persist_fn = persist_fn self.error_report_fn = error_report_fn self.buffer = buffer self.store_frames = store_frames @@ -111,7 +112,6 @@ def __init__( self._can_instances: Dict[str, Any] = {} # channel -> CANAdapter self._active_mavlink_streams: Dict[str, asyncio.Task] = {} self._mavlink_instances: Dict[str, Any] = {} # conn_string -> MAVLinkAdapter - self._recording: bool = False # ========================================================================= # WebSocket Send with Buffer Fallback @@ -120,10 +120,21 @@ def __init__( async def _send_or_buffer(self, ws, points: List[Dict[str, Any]]) -> bool: """Send telemetry over WebSocket, falling back to buffer on failure. + Wraps points in the pipeline envelope with version, trace_id, and + source identifiers for gateway routing. + Returns True if sent over WebSocket, False if buffered. """ try: - await ws.send(json.dumps({"type": "telemetry", "points": points})) + envelope = { + "type": "telemetry", + "v": 1, + "trace_id": uuid.uuid4().hex, + "source_id": self._source_id, + "points": points, + "ingested_at": int(time.time() * 1000), + } + await ws.send(json.dumps(envelope)) return True except (ConnectionClosed, ConnectionError, OSError): if self.buffer and points: @@ -138,25 +149,50 @@ async def _send_or_buffer(self, ws, points: List[Dict[str, Any]]) -> bool: async def start_stream(self, data: dict, ws): """Start streaming sensor data. + Each sensor is sampled at its own declared `sample_rate` (Hz). + The loop ticks at the fastest sensor's rate and reads each + sensor only when its interval is up. Sensors with different + rates coexist correctly — a 100Hz IMU and a 1Hz BME280 in the + same hub each run at their natural cadence. + Args (from dashboard): - store: bool - If True, persist to ClickHouse. If False, real-time only. metrics: list - Which metrics to stream (empty = all) - interval_ms: int - Sampling interval + interval_ms: int - Optional global rate cap in ms. When set, + no sensor samples faster than 1000/interval_ms Hz. When + omitted, sensors use their declared rates. """ stream_id = data.get("id", f"stream_{int(time.time())}") metrics = data.get("metrics", []) - interval_ms = data.get("interval_ms", 100) - store = data.get("store", False) - - self._recording = store + interval_ms = data.get("interval_ms") # optional global cap if not self.sensor_hub: self.on_status("No sensors configured") return - mode = "Recording" if store else "Streaming" + sensors = list(self.sensor_hub.sensors) + if not sensors: + self.on_status("No sensors available") + return + + # Apply optional global rate cap from the dashboard. + # Each sensor samples at min(sensor.sample_rate, cap_hz). + cap_hz = None + if interval_ms and interval_ms > 0: + cap_hz = 1000.0 / interval_ms + + def effective_rate(s) -> float: + rate = getattr(s, "sample_rate", 10.0) or 10.0 + if cap_hz is not None: + rate = min(rate, cap_hz) + return max(rate, 0.01) # floor at 0.01Hz (one read per 100s) + + # Loop tick = fastest sensor's interval + max_rate = max(effective_rate(s) for s in sensors) + tick = 1.0 / max_rate + metric_count = len(metrics) if metrics else "all" - self.on_status(f"{mode} {metric_count} metrics @ {interval_ms}ms") + cap_note = f" cap={cap_hz:.1f}Hz" if cap_hz is not None else "" + self.on_status(f"Streaming {metric_count} metrics (tick {tick*1000:.0f}ms{cap_note})") async def stream_loop(): # Parse metric filters (strip source_id prefix if present) @@ -164,25 +200,42 @@ async def stream_loop(): for m in metrics: filters.add(m.split(":", 1)[-1] if ":" in m else m) + # Per-sensor last-read timestamps + last_read = {id(s): 0.0 for s in sensors} + try: while stream_id in self._active_streams: - readings = self.sensor_hub.read_all() + now = time.time() + readings = [] + + for sensor in sensors: + if getattr(sensor, "_disabled", False): + continue + interval = 1.0 / effective_rate(sensor) + if now - last_read[id(sensor)] >= interval: + try: + readings.extend(sensor.read()) + except Exception as e: + logger.debug(f"Sensor read failed: {sensor.name}: {e}") + continue + last_read[id(sensor)] = now + if filters: readings = [r for r in readings if r.metric in filters] - points = [ - {"metric": r.metric, "value": r.value, "timestamp": int(time.time() * 1000)} - for r in readings - ] - - # Send to WebSocket (real-time), buffer on failure - await self._send_or_buffer(ws, points) - - # If recording, also persist via HTTP - if self._recording and points and self.persist_fn: - asyncio.create_task(self.persist_fn(points)) + if readings: + points = [ + { + "class": "metric", + "metric": r.metric, + "value": r.value, + "timestamp": int(time.time() * 1000), + } + for r in readings + ] + await self._send_or_buffer(ws, points) - await asyncio.sleep(interval_ms / 1000) + await asyncio.sleep(tick) except asyncio.CancelledError: pass except Exception as e: @@ -208,8 +261,6 @@ async def stop_stream(self, data: dict): del self._active_streams[stream_id] self.on_status("Stopped stream") - self._recording = False - async def configure_sensor(self, data: dict): """Configure a sensor's runtime parameters. @@ -289,6 +340,9 @@ async def camera_loop(): if frame: await ws.send(json.dumps({ "type": "video_frame", + "v": 1, + "trace_id": uuid.uuid4().hex, + "source_id": self._source_id, "camera_id": camera_id, "frame": base64.b64encode(frame.data).decode('ascii'), "width": frame.width, @@ -399,10 +453,6 @@ async def start_can_stream(self, data: dict, ws): dbc_path = data.get("dbc_path") interval_ms = data.get("interval_ms", 10) - store = data.get("store", False) - - if store: - self._recording = True try: from plexus.adapters.can import CANAdapter @@ -434,6 +484,7 @@ async def can_loop(): if metrics: points = [ { + "class": m.data_class, "metric": m.name, "value": m.value, "timestamp": int((m.timestamp or time.time()) * 1000), @@ -443,9 +494,6 @@ async def can_loop(): ] await self._send_or_buffer(ws, points) - if self._recording and self.persist_fn: - asyncio.create_task(self.persist_fn(points)) - await asyncio.sleep(interval_ms / 1000) except asyncio.CancelledError: pass @@ -516,12 +564,8 @@ async def start_mavlink_stream(self, data: dict, ws): self._cleanup_mavlink_instance(conn_str) interval_ms = data.get("interval_ms", 10) - store = data.get("store", False) include_messages = data.get("include_messages") - if store: - self._recording = True - try: from plexus.adapters.mavlink import MAVLinkAdapter @@ -549,6 +593,7 @@ async def mavlink_loop(): if metrics: points = [ { + "class": m.data_class, "metric": m.name, "value": m.value, "timestamp": int((m.timestamp or time.time()) * 1000), @@ -558,9 +603,6 @@ async def mavlink_loop(): ] await self._send_or_buffer(ws, points) - if self._recording and self.persist_fn: - asyncio.create_task(self.persist_fn(points)) - await asyncio.sleep(interval_ms / 1000) except asyncio.CancelledError: pass @@ -608,8 +650,6 @@ def _cleanup_mavlink_instance(self, conn_str: str): def cancel_all(self): """Cancel all active streams.""" - self._recording = False - for task in self._active_streams.values(): task.cancel() self._active_streams.clear()