diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ea1f77..62a60f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,33 @@ # Changelog +## [0.5.2] - 2026-05-21 - DX hardening for hardware engineers + +### Fixed + +- Error messages now reference `plexus init` instead of the non-existent `plexus start`. +- `close()` now attempts to flush any buffered points before tearing down the transport, + preventing silent data loss on graceful shutdown. +- `persistent_buffer` default changed from `False` to `True` — store-and-forward is now + on by default, matching the `~/.plexus/config.json` default and the right choice for + field hardware. Pass `persistent_buffer=False` to opt out (e.g. in test fixtures). + +### Added + +- `send_batch()` now accepts 3-tuples `(metric, value, timestamp)` alongside the existing + 2-tuple form. Per-point timestamps let sensors on different interrupt timers share a + single batch call. 2-tuples continue to use the shared `timestamp` argument. +- `on_command()` now warns immediately via stderr if called after the WebSocket has already + authenticated, making the "register before first send()" ordering requirement visible + rather than silently broken. +- `source_id` is validated against `^[a-z0-9][a-z0-9_-]{1,62}$` at construction time. + Invalid names now raise `ValueError` with a clear message instead of failing obscurely + at the gateway. + +### Changed + +- `_say()` / `_QUIET` consolidated into a new internal `plexus/_log.py` module. + Previously duplicated verbatim between `client.py` and `ws.py`. + ## [0.5.1] - 2026-05-19 - Binary video frames + non-blocking send ### Performance diff --git a/examples/uv.lock b/examples/uv.lock index 57f430e..a489797 100644 --- a/examples/uv.lock +++ b/examples/uv.lock @@ -674,11 +674,11 @@ wheels = [ [[package]] name = "urllib3" -version = "2.6.3" +version = "2.7.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } +sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, + { url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" }, ] [[package]] diff --git a/plexus/__init__.py b/plexus/__init__.py index 67f81be..e74774d 100644 --- a/plexus/__init__.py +++ b/plexus/__init__.py @@ -10,5 +10,5 @@ from plexus.client import Plexus, read_mjpeg_frames from plexus.ws import WebSocketTransport -__version__ = "0.5.1" +__version__ = "0.5.2" __all__ = ["Plexus", "WebSocketTransport", "read_mjpeg_frames"] diff --git a/plexus/_log.py b/plexus/_log.py new file mode 100644 index 0000000..c447c40 --- /dev/null +++ b/plexus/_log.py @@ -0,0 +1,15 @@ +import os +import sys + +_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes") + + +def _say(line: str) -> None: + """Single-line status message to stderr. Skipped if PLEXUS_QUIET=1.""" + if _QUIET: + return + try: + sys.stderr.write(f"[plexus] {line}\n") + sys.stderr.flush() + except Exception: + pass diff --git a/plexus/client.py b/plexus/client.py index e8fd8fe..34fdd6e 100644 --- a/plexus/client.py +++ b/plexus/client.py @@ -30,17 +30,17 @@ px.send("temperature", read_temp()) time.sleep(0.01) -Note: Requires authentication. Run 'plexus start' or set PLEXUS_API_KEY. +Note: Requires authentication. Run 'plexus init' or set PLEXUS_API_KEY. """ import gzip import json import logging import os +import re import shutil import socket import subprocess -import sys import threading import time import urllib.error @@ -101,19 +101,7 @@ class _ConnError(OSError): pass -# Status messages to stderr so users running `python my_script.py` see what's -# happening without having to configure logging. Set PLEXUS_QUIET=1 to disable. -_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes") - - -def _say(line: str) -> None: - if _QUIET: - return - try: - sys.stderr.write(f"[plexus] {line}\n") - sys.stderr.flush() - except Exception: - pass +from plexus._log import _say # Flexible value type - supports any JSON-serializable value FlexValue = Union[int, float, str, bool, Dict[str, Any], List[Any]] @@ -161,6 +149,18 @@ class AuthenticationError(PlexusError): pass +_SOURCE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{1,62}$') + + +def _validate_source_id(source_id: str) -> None: + if not _SOURCE_ID_RE.match(source_id): + raise ValueError( + f"Invalid source_id {source_id!r}. " + "Must match ^[a-z0-9][a-z0-9_-]{1,62}$ " + "(lowercase letters, digits, hyphens, underscores; start with letter or digit)." + ) + + class Plexus: """ Client for sending sensor data to Plexus. @@ -186,7 +186,7 @@ def __init__( timeout: float = 10.0, retry_config: Optional[RetryConfig] = None, max_buffer_size: int = 10000, - persistent_buffer: bool = False, + persistent_buffer: bool = True, buffer_path: Optional[str] = None, transport: str = "ws", ws_url: Optional[str] = None, @@ -201,6 +201,7 @@ def __init__( self.endpoint = (endpoint or get_endpoint()).rstrip("/") self.gateway_url = get_gateway_url() self.source_id = source_id or get_source_id() + _validate_source_id(self.source_id) self.timeout = timeout self.retry_config = retry_config or RetryConfig() self._max_buffer_size = max_buffer_size @@ -360,7 +361,7 @@ def event( def send_batch( self, - points: List[Tuple[str, FlexValue]], + points: List[Union[Tuple[str, FlexValue], Tuple[str, FlexValue, float]]], timestamp: Optional[float] = None, tags: Optional[Dict[str, str]] = None, ) -> bool: @@ -368,8 +369,11 @@ def send_batch( Send multiple metrics at once. Args: - points: List of (metric, value) tuples. Values can be any FlexValue type. - timestamp: Shared timestamp for all points. If not provided, uses current time. + points: List of (metric, value) or (metric, value, timestamp) tuples. + Values can be any FlexValue type. Per-point timestamps override + the shared timestamp argument. + timestamp: Shared timestamp for points that don't supply their own. + If not provided, uses current time. tags: Shared tags for all points Returns: @@ -382,9 +386,23 @@ def send_batch( ("robot.state", "RUNNING"), ("position", {"x": 1.0, "y": 2.0}), ]) + + # Per-point timestamps (e.g. sensors on different interrupt timers): + px.send_batch([ + ("imu.accel_x", 0.12, t_imu), + ("pressure", 1013.2, t_baro), + ("temperature", 22.4), # uses shared timestamp + ]) """ - ts_ms = self._normalize_ts_ms(timestamp) - data_points = [self._make_point(m, v, ts_ms, tags) for m, v in points] + default_ts_ms = self._normalize_ts_ms(timestamp) + data_points = [] + for p in points: + if len(p) == 3: + m, v, t = p + data_points.append(self._make_point(m, v, self._normalize_ts_ms(t), tags)) + else: + m, v = p + data_points.append(self._make_point(m, v, default_ts_ms, tags)) return self._send_points(data_points) def _ensure_ws(self): @@ -656,6 +674,12 @@ def on_command( if self.transport != "ws": raise PlexusError("on_command requires transport='ws'") ws = self._ensure_ws() + if ws.is_authenticated: + _say( + f"⚠ on_command('{name}') called after connection is already authenticated — " + "command will not be advertised to the dashboard until next reconnect. " + "Call on_command() before the first send()." + ) ws.register_command(name, handler, description=description, params=params) def _send_points(self, points: List[Dict[str, Any]]) -> bool: @@ -673,7 +697,7 @@ def _send_points(self, points: List[Dict[str, Any]]) -> bool: """ if not self.api_key: raise AuthenticationError( - "No API key configured. Run 'plexus start' or set PLEXUS_API_KEY" + "No API key configured. Run 'plexus init' or set PLEXUS_API_KEY" ) # Include any previously buffered points @@ -905,7 +929,12 @@ def run(self, run_id: str, tags: Optional[Dict[str, str]] = None, store_frames: self._store_frames = False def close(self): - """Close the client and release resources.""" + """Close the client, flush any buffered points, and release resources.""" + if self.buffer_size() > 0: + try: + self.flush_buffer() + except Exception as e: + logger.debug("flush on close failed: %s", e) if self._ws is not None: self._ws.stop() self._ws = None diff --git a/plexus/ws.py b/plexus/ws.py index 93a9101..81555b6 100644 --- a/plexus/ws.py +++ b/plexus/ws.py @@ -32,7 +32,6 @@ import queue import random import struct -import sys import threading import time from dataclasses import dataclass, field @@ -48,22 +47,7 @@ logger = logging.getLogger(__name__) -# By default, print connection status to stderr so users running -# `python my_script.py` can see what's happening without having to -# configure the logging module. Set PLEXUS_QUIET=1 to disable. -_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes") - - -def _say(line: str) -> None: - """Single-line status message to stderr. Skipped if PLEXUS_QUIET=1.""" - if _QUIET: - return - try: - sys.stderr.write(f"[plexus] {line}\n") - sys.stderr.flush() - except Exception: - # Stderr blew up — don't take the whole client down with it. - pass +from plexus._log import _say AUTH_TIMEOUT_S = 10.0 HEARTBEAT_INTERVAL_S = 30.0 diff --git a/pyproject.toml b/pyproject.toml index 3ba56eb..cbceba3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "plexus-python" -version = "0.5.1" +version = "0.5.2" description = "Thin Python SDK for Plexus — send telemetry in one line" readme = "README.md" license = "Apache-2.0" diff --git a/tests/test_basic.py b/tests/test_basic.py index bf675b4..5709b1e 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -21,14 +21,14 @@ def test_default_config(): def test_client_init(): """Client should initialize without error.""" - px = Plexus(api_key="test_key", endpoint="http://localhost") + px = Plexus(api_key="test_key", endpoint="http://localhost", persistent_buffer=False) assert px.api_key == "test_key" assert px.endpoint == "http://localhost" def test_make_point(): """Client should create valid data points.""" - px = Plexus(api_key="test", endpoint="http://localhost") + px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False) point = px._make_point("temperature", 72.5) assert point["metric"] == "temperature" @@ -39,14 +39,14 @@ def test_make_point(): def test_make_point_with_tags(): """Data points should include tags when provided.""" - px = Plexus(api_key="test", endpoint="http://localhost") + px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False) point = px._make_point("temperature", 72.5, tags={"sensor": "A1"}) assert point["tags"] == {"sensor": "A1"} def test_normalize_ts_ms_applies_clock_offset(): - px = Plexus(api_key="test", endpoint="http://localhost") + px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False) px._clock_offset_ms = 5000 before = int(time.time() * 1000) ts = px._normalize_ts_ms(None) @@ -55,7 +55,63 @@ def test_normalize_ts_ms_applies_clock_offset(): def test_normalize_ts_ms_ignores_offset_for_supplied_timestamp(): - px = Plexus(api_key="test", endpoint="http://localhost") + px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False) px._clock_offset_ms = 5000 ts = px._normalize_ts_ms(1_700_000_000.0) assert ts == 1_700_000_000_000 + + +def test_send_batch_shared_timestamp(): + """All 2-tuple points share the same timestamp when none is supplied per-point.""" + px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False) + t = 1_700_000_000.0 + points = px._make_point # just verify via send_batch internals + + # Build points directly to inspect timestamps + default_ts_ms = px._normalize_ts_ms(t) + from plexus.client import Plexus as _Plexus + + batch_points = [("temp", 22.0), ("humidity", 55.0)] + data = [] + for p in batch_points: + m, v = p + data.append(px._make_point(m, v, default_ts_ms, None)) + + assert data[0]["timestamp"] == data[1]["timestamp"] == int(t * 1000) + + +def test_send_batch_per_point_timestamps(): + """3-tuple entries use their own timestamp; 2-tuple entries use the shared default.""" + px = Plexus(api_key="test", endpoint="http://localhost", persistent_buffer=False) + t_shared = 1_700_000_000.0 + t_imu = 1_700_000_001.0 + t_baro = 1_700_000_002.0 + + default_ts_ms = px._normalize_ts_ms(t_shared) + batch = [ + ("imu.accel_x", 0.12, t_imu), + ("pressure", 1013.2, t_baro), + ("temperature", 22.4), + ] + data_points = [] + for p in batch: + if len(p) == 3: + m, v, t = p + data_points.append(px._make_point(m, v, px._normalize_ts_ms(t), None)) + else: + m, v = p + data_points.append(px._make_point(m, v, default_ts_ms, None)) + + assert data_points[0]["timestamp"] == int(t_imu * 1000) + assert data_points[1]["timestamp"] == int(t_baro * 1000) + assert data_points[2]["timestamp"] == int(t_shared * 1000) + + +def test_error_message_uses_plexus_init(): + """AuthenticationError for missing API key must reference 'plexus init', not 'plexus start'.""" + import pytest + from plexus.client import AuthenticationError + px = Plexus(api_key="test_key", endpoint="http://localhost", persistent_buffer=False) + px.api_key = "" + with pytest.raises(AuthenticationError, match="plexus init"): + px._send_points([{"metric": "x", "value": 1, "timestamp": 0, "class": "metric"}]) diff --git a/tests/test_retry.py b/tests/test_retry.py index acc6793..1ca6b47 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -59,6 +59,7 @@ def client(self): api_key="test_key", endpoint="http://localhost:9999", retry_config=RetryConfig(max_retries=2, base_delay=0.01, jitter=False), + persistent_buffer=False, ) def test_retry_on_timeout(self, client): @@ -213,6 +214,7 @@ def client(self): endpoint="http://localhost:9999", retry_config=RetryConfig(max_retries=1, base_delay=0.01, jitter=False), max_buffer_size=100, + persistent_buffer=False, ) def test_buffer_on_all_retries_failed(self, client):