Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Changelog

## [0.5.2] - 2026-05-21 - DX hardening for hardware engineers

### Fixed

- Error messages now reference `plexus init` instead of the non-existent `plexus start`.
- `close()` now attempts to flush any buffered points before tearing down the transport,
preventing silent data loss on graceful shutdown.
- `persistent_buffer` default changed from `False` to `True` — store-and-forward is now
on by default, matching the `~/.plexus/config.json` default and the right choice for
field hardware. Pass `persistent_buffer=False` to opt out (e.g. in test fixtures).

### Added

- `send_batch()` now accepts 3-tuples `(metric, value, timestamp)` alongside the existing
2-tuple form. Per-point timestamps let sensors on different interrupt timers share a
single batch call. 2-tuples continue to use the shared `timestamp` argument.
- `on_command()` now warns immediately via stderr if called after the WebSocket has already
authenticated, making the "register before first send()" ordering requirement visible
rather than silently broken.
- `source_id` is validated against `^[a-z0-9][a-z0-9_-]{1,62}$` at construction time.
Invalid names now raise `ValueError` with a clear message instead of failing obscurely
at the gateway.

### Changed

- `_say()` / `_QUIET` consolidated into a new internal `plexus/_log.py` module.
Previously duplicated verbatim between `client.py` and `ws.py`.

## [0.5.1] - 2026-05-19 - Binary video frames + non-blocking send

### Performance
Expand Down
6 changes: 3 additions & 3 deletions examples/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plexus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
from plexus.client import Plexus, read_mjpeg_frames
from plexus.ws import WebSocketTransport

__version__ = "0.5.1"
__version__ = "0.5.2"
__all__ = ["Plexus", "WebSocketTransport", "read_mjpeg_frames"]
15 changes: 15 additions & 0 deletions plexus/_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os
import sys

_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes")


def _say(line: str) -> None:
"""Single-line status message to stderr. Skipped if PLEXUS_QUIET=1."""
if _QUIET:
return
try:
sys.stderr.write(f"[plexus] {line}\n")
sys.stderr.flush()
except Exception:
pass
75 changes: 52 additions & 23 deletions plexus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
px.send("temperature", read_temp())
time.sleep(0.01)

Note: Requires authentication. Run 'plexus start' or set PLEXUS_API_KEY.
Note: Requires authentication. Run 'plexus init' or set PLEXUS_API_KEY.
"""

import gzip
import json
import logging
import os
import re
import shutil
import socket
import subprocess
import sys
import threading
import time
import urllib.error
Expand Down Expand Up @@ -101,19 +101,7 @@ class _ConnError(OSError):
pass


# Status messages to stderr so users running `python my_script.py` see what's
# happening without having to configure logging. Set PLEXUS_QUIET=1 to disable.
_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes")


def _say(line: str) -> None:
if _QUIET:
return
try:
sys.stderr.write(f"[plexus] {line}\n")
sys.stderr.flush()
except Exception:
pass
from plexus._log import _say

# Flexible value type - supports any JSON-serializable value
FlexValue = Union[int, float, str, bool, Dict[str, Any], List[Any]]
Expand Down Expand Up @@ -161,6 +149,18 @@ class AuthenticationError(PlexusError):
pass


_SOURCE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{1,62}$')


def _validate_source_id(source_id: str) -> None:
if not _SOURCE_ID_RE.match(source_id):
raise ValueError(
f"Invalid source_id {source_id!r}. "
"Must match ^[a-z0-9][a-z0-9_-]{1,62}$ "
"(lowercase letters, digits, hyphens, underscores; start with letter or digit)."
)


class Plexus:
"""
Client for sending sensor data to Plexus.
Expand All @@ -186,7 +186,7 @@ def __init__(
timeout: float = 10.0,
retry_config: Optional[RetryConfig] = None,
max_buffer_size: int = 10000,
persistent_buffer: bool = False,
persistent_buffer: bool = True,
buffer_path: Optional[str] = None,
transport: str = "ws",
ws_url: Optional[str] = None,
Expand All @@ -201,6 +201,7 @@ def __init__(
self.endpoint = (endpoint or get_endpoint()).rstrip("/")
self.gateway_url = get_gateway_url()
self.source_id = source_id or get_source_id()
_validate_source_id(self.source_id)
self.timeout = timeout
self.retry_config = retry_config or RetryConfig()
self._max_buffer_size = max_buffer_size
Expand Down Expand Up @@ -360,16 +361,19 @@ def event(

def send_batch(
self,
points: List[Tuple[str, FlexValue]],
points: List[Union[Tuple[str, FlexValue], Tuple[str, FlexValue, float]]],
timestamp: Optional[float] = None,
tags: Optional[Dict[str, str]] = None,
) -> bool:
"""
Send multiple metrics at once.

Args:
points: List of (metric, value) tuples. Values can be any FlexValue type.
timestamp: Shared timestamp for all points. If not provided, uses current time.
points: List of (metric, value) or (metric, value, timestamp) tuples.
Values can be any FlexValue type. Per-point timestamps override
the shared timestamp argument.
timestamp: Shared timestamp for points that don't supply their own.
If not provided, uses current time.
tags: Shared tags for all points

Returns:
Expand All @@ -382,9 +386,23 @@ def send_batch(
("robot.state", "RUNNING"),
("position", {"x": 1.0, "y": 2.0}),
])

# Per-point timestamps (e.g. sensors on different interrupt timers):
px.send_batch([
("imu.accel_x", 0.12, t_imu),
("pressure", 1013.2, t_baro),
("temperature", 22.4), # uses shared timestamp
])
"""
ts_ms = self._normalize_ts_ms(timestamp)
data_points = [self._make_point(m, v, ts_ms, tags) for m, v in points]
default_ts_ms = self._normalize_ts_ms(timestamp)
data_points = []
for p in points:
if len(p) == 3:
m, v, t = p
data_points.append(self._make_point(m, v, self._normalize_ts_ms(t), tags))
else:
m, v = p
data_points.append(self._make_point(m, v, default_ts_ms, tags))
return self._send_points(data_points)

def _ensure_ws(self):
Expand Down Expand Up @@ -656,6 +674,12 @@ def on_command(
if self.transport != "ws":
raise PlexusError("on_command requires transport='ws'")
ws = self._ensure_ws()
if ws.is_authenticated:
_say(
f"⚠ on_command('{name}') called after connection is already authenticated — "
"command will not be advertised to the dashboard until next reconnect. "
"Call on_command() before the first send()."
)
ws.register_command(name, handler, description=description, params=params)

def _send_points(self, points: List[Dict[str, Any]]) -> bool:
Expand All @@ -673,7 +697,7 @@ def _send_points(self, points: List[Dict[str, Any]]) -> bool:
"""
if not self.api_key:
raise AuthenticationError(
"No API key configured. Run 'plexus start' or set PLEXUS_API_KEY"
"No API key configured. Run 'plexus init' or set PLEXUS_API_KEY"
)

# Include any previously buffered points
Expand Down Expand Up @@ -905,7 +929,12 @@ def run(self, run_id: str, tags: Optional[Dict[str, str]] = None, store_frames:
self._store_frames = False

def close(self):
"""Close the client and release resources."""
"""Close the client, flush any buffered points, and release resources."""
if self.buffer_size() > 0:
try:
self.flush_buffer()
except Exception as e:
logger.debug("flush on close failed: %s", e)
if self._ws is not None:
self._ws.stop()
self._ws = None
Expand Down
18 changes: 1 addition & 17 deletions plexus/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import queue
import random
import struct
import sys
import threading
import time
from dataclasses import dataclass, field
Expand All @@ -48,22 +47,7 @@

logger = logging.getLogger(__name__)

# By default, print connection status to stderr so users running
# `python my_script.py` can see what's happening without having to
# configure the logging module. Set PLEXUS_QUIET=1 to disable.
_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes")


def _say(line: str) -> None:
"""Single-line status message to stderr. Skipped if PLEXUS_QUIET=1."""
if _QUIET:
return
try:
sys.stderr.write(f"[plexus] {line}\n")
sys.stderr.flush()
except Exception:
# Stderr blew up — don't take the whole client down with it.
pass
from plexus._log import _say

AUTH_TIMEOUT_S = 10.0
HEARTBEAT_INTERVAL_S = 30.0
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "plexus-python"
version = "0.5.1"
version = "0.5.2"
description = "Thin Python SDK for Plexus — send telemetry in one line"
readme = "README.md"
license = "Apache-2.0"
Expand Down
66 changes: 61 additions & 5 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ def test_default_config():

def test_client_init():
"""Client should initialize without error."""
px = Plexus(api_key="test_key", endpoint="http://localhost")
px = Plexus(api_key="test_key", endpoint="http://localhost", persistent_buffer=False)
assert px.api_key == "test_key"
assert px.endpoint == "http://localhost"


def test_make_point():
"""Client should create valid data points."""
px = Plexus(api_key="test", endpoint="http://localhost")
px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False)
point = px._make_point("temperature", 72.5)

assert point["metric"] == "temperature"
Expand All @@ -39,14 +39,14 @@ def test_make_point():

def test_make_point_with_tags():
"""Data points should include tags when provided."""
px = Plexus(api_key="test", endpoint="http://localhost")
px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False)
point = px._make_point("temperature", 72.5, tags={"sensor": "A1"})

assert point["tags"] == {"sensor": "A1"}


def test_normalize_ts_ms_applies_clock_offset():
px = Plexus(api_key="test", endpoint="http://localhost")
px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False)
px._clock_offset_ms = 5000
before = int(time.time() * 1000)
ts = px._normalize_ts_ms(None)
Expand All @@ -55,7 +55,63 @@ def test_normalize_ts_ms_applies_clock_offset():


def test_normalize_ts_ms_ignores_offset_for_supplied_timestamp():
px = Plexus(api_key="test", endpoint="http://localhost")
px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False)
px._clock_offset_ms = 5000
ts = px._normalize_ts_ms(1_700_000_000.0)
assert ts == 1_700_000_000_000


def test_send_batch_shared_timestamp():
"""All 2-tuple points share the same timestamp when none is supplied per-point."""
px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False)
t = 1_700_000_000.0
points = px._make_point # just verify via send_batch internals

# Build points directly to inspect timestamps
default_ts_ms = px._normalize_ts_ms(t)
from plexus.client import Plexus as _Plexus

batch_points = [("temp", 22.0), ("humidity", 55.0)]
data = []
for p in batch_points:
m, v = p
data.append(px._make_point(m, v, default_ts_ms, None))

assert data[0]["timestamp"] == data[1]["timestamp"] == int(t * 1000)


def test_send_batch_per_point_timestamps():
"""3-tuple entries use their own timestamp; 2-tuple entries use the shared default."""
px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False)
t_shared = 1_700_000_000.0
t_imu = 1_700_000_001.0
t_baro = 1_700_000_002.0

default_ts_ms = px._normalize_ts_ms(t_shared)
batch = [
("imu.accel_x", 0.12, t_imu),
("pressure", 1013.2, t_baro),
("temperature", 22.4),
]
data_points = []
for p in batch:
if len(p) == 3:
m, v, t = p
data_points.append(px._make_point(m, v, px._normalize_ts_ms(t), None))
else:
m, v = p
data_points.append(px._make_point(m, v, default_ts_ms, None))

assert data_points[0]["timestamp"] == int(t_imu * 1000)
assert data_points[1]["timestamp"] == int(t_baro * 1000)
assert data_points[2]["timestamp"] == int(t_shared * 1000)


def test_error_message_uses_plexus_init():
"""AuthenticationError for missing API key must reference 'plexus init', not 'plexus start'."""
import pytest
from plexus.client import AuthenticationError
px = Plexus(api_key="test_key", endpoint="http://localhost", persistent_buffer=False)
px.api_key = ""
with pytest.raises(AuthenticationError, match="plexus init"):
px._send_points([{"metric": "x", "value": 1, "timestamp": 0, "class": "metric"}])
2 changes: 2 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def client(self):
api_key="test_key",
endpoint="http://localhost:9999",
retry_config=RetryConfig(max_retries=2, base_delay=0.01, jitter=False),
persistent_buffer=False,
)

def test_retry_on_timeout(self, client):
Expand Down Expand Up @@ -213,6 +214,7 @@ def client(self):
endpoint="http://localhost:9999",
retry_config=RetryConfig(max_retries=1, base_delay=0.01, jitter=False),
max_buffer_size=100,
persistent_buffer=False,
)

def test_buffer_on_all_retries_failed(self, client):
Expand Down
Loading