Skip to content

plexus-oss/plexus-python

plexus-python

Thin Python SDK for Plexus. Send telemetry to the Plexus gateway in one line. Storage, dashboards, alerts, and fleet management live in the platform — this package just ships your data.

PyPI License

Quick Start

pip install plexus-python
from plexus import Plexus

px = Plexus(api_key="plx_xxx", source_id="device-001")
px.send("temperature", 72.5)

Get an API key at app.plexus.company → Devices → Add Device.

Device identity

Every device needs a unique source_id. The recommended way to set one on a real host is the bootstrap script, which requires a device name up front:

curl -sL https://app.plexus.company/setup | bash -s -- \
  --key plx_xxx --name drone-01

The name must match ^[a-z0-9][a-z0-9_-]{1,62}$. setup.sh refuses to run without --name (or without a TTY to prompt for one) — this is deliberate, because the previous hostname fallback silently merged telemetry from cloned SD-card images that all booted as raspberrypi.

If two devices end up requesting the same name, the gateway auto-suffixes: the first connection gets drone-01, the second gets drone-01_2, the third drone-01_3, and so on. The SDK logs the rename at INFO and persists the assigned name to ~/.plexus/config.json so the device keeps its identity across reboots. Under the hood, a per-installation UUID (install_id, lazily generated on first run) is what lets the gateway tell "same device reconnecting" from "different device claiming the same name."

In normal code, you usually just pass source_id=... explicitly to Plexus(...) and never have to think about it.

Core methods

send(metric, value) — stream a reading

The main method. Call it every time you have a new sensor reading.

px = Plexus(source_id="rig-01")   # reads PLEXUS_API_KEY from env

px.send("engine.rpm", 3450)
px.send("coolant.temp", 82.3)

metric is a dot-namespaced string ("motor.rpm", "gps.fix_quality"). value accepts any JSON-serializable type:

Type Example When to use
float / int 72.5, 3450 Sensor readings, counters
str "RUNNING", "E_STALL" State machines, error codes
bool True Binary flags
dict {"x": 1.5, "y": 2.3} Vectors, structured readings
list [0.5, 1.2, -0.3] Waveforms, joint angles

Optional arguments:

  • tags={"motor_id": "A1"} — key-value labels for filtering in the dashboard
  • timestamp=t — explicit Unix timestamp in seconds; omit to let the SDK pick (see Timestamps)

send_batch(points) — send multiple readings at once

Use this when you sample several sensors together and want them to share a timestamp and land in one network call.

px.send_batch([
    ("temperature", 22.4),
    ("humidity",    58.1),
    ("pressure",    1013.2),
])

points is a list of (metric, value) tuples. All points share the same timestamp (now, unless you pass timestamp=t). For independent timestamps per point, call send() in a loop instead.

event(name, data) — record a discrete occurrence

Use event() for things that happen rather than things you measure continuously. Faults, state transitions, operator actions, log entries — anything you'd put on a timeline as a marker rather than plot as a graph.

px.event("fault",        "E-stop triggered")
px.event("state_change", {"from": "IDLE", "to": "RUNNING"})
px.event("sensor_error", {"sensor": "imu", "code": 42}, tags={"motor": "A"})

The platform displays events as markers overlaid on your telemetry charts, not as time-series lines.

run(run_id) — group data into a named recording

with px.run("thermal-cycle-001"):
    while running:
        px.send("temperature", read_temp())

All send() calls inside the context are tagged with run_id, making it easy to isolate and replay that slice of data in the dashboard.

Video streaming

Two methods depending on whether you control the capture loop or just have a URL.

send_video_frame(frame, camera_id) — send frames you capture yourself

Use this when your code owns the capture loop — a picamera2 callback, an OpenCV VideoCapture loop, or an FFmpeg pipe you manage. Pass each frame and the SDK ships it to Plexus over WebSocket.

import cv2

cap = cv2.VideoCapture(0)
while True:
    ok, frame = cap.read()
    if ok:
        px.send_video_frame(frame, camera_id="front")

Accepted frame types:

  • numpy ndarray (H × W × C) — from OpenCV or picamera2; requires opencv-python
  • JPEG bytes — passed through as-is, zero re-encode overhead
  • Other image bytes (PNG, BMP, WebP) — decoded and re-encoded as JPEG via Pillow; requires pip install plexus-python[video]

camera_id identifies which camera the frame came from. Use distinct IDs when streaming from multiple cameras simultaneously ("front", "rear", "cam:0").

stream_camera(url, camera_id) — stream from an RTSP URL or file

Use this when you have an RTSP stream or video file and don't want to manage the capture loop yourself. The SDK runs FFmpeg internally and handles the rest. Requires FFmpeg on $PATH.

stop = px.stream_camera("rtsp://192.168.1.100/stream", camera_id="front")
# ... do other work ...
stop.set()  # stop streaming

Returns a threading.Event — call .set() to stop. Runs in a background thread so it doesn't block your main loop.

Which to use: if you're piping from rpicam-vid, picamera2, or your own capture process, use send_video_frame(). If you have an RTSP URL or file path, use stream_camera().

Bring Your Own Protocol

This package ships no adapters, auto-detection, or daemons — just the client. Use whatever library you'd use anyway and pipe values into px.send().

# MAVLink (pymavlink)
for msg in conn:
    if msg.get_type() == "ATTITUDE":
        px.send("attitude.roll", msg.roll)

# CAN (python-can)
for msg in bus:
    px.send(f"can.0x{msg.arbitration_id:x}", int.from_bytes(msg.data, "big"))

# MQTT (paho-mqtt)
def on_message(_c, _u, msg):
    px.send(msg.topic.replace("/", "."), float(msg.payload))

# I2C sensor (Adafruit CircuitPython)
px.send("temperature", bme.temperature)

See examples/ for runnable versions of each.

Reliability

Every send buffers locally before hitting the network, retries with exponential backoff, and keeps your data safe across outages. Enable SQLite persistence to survive restarts and power loss:

px = Plexus(persistent_buffer=True)

Point counts and flush:

px.buffer_size()
px.flush_buffer()

Timestamps and clock correction

By default — px.send("temp", 72.5) with no timestamp argument — the SDK picks the time itself. Over WebSocket, it synchronizes with the gateway clock on every connection, so data lands at the right place on the timeline even if the device's system clock is wrong (no NTP on first boot, stale RTC, fresh OS image).

px.send("temperature", 72.5)                # SDK picks time; gateway-synced over WS
px.send("temperature", 72.5, timestamp=t)   # your timestamp, used as-is, no correction

Pass an explicit timestamp when you have a reliable external time source (GPS, trusted RTC, host NTP) or are replaying historical data with known timestamps.

Omit timestamp when the device may have booted without NTP — which is the default on Raspberry Pi, Jetson, and most embedded Linux boards without a network connection at first boot.

Known limits:

  • Clock sync refreshes on WebSocket (re)connect. A device with a drifting RTC that stays connected for many days accumulates uncorrected drift between reconnects.
  • HTTP-only transport (transport="http") does not receive clock sync — timestamps default to the uncorrected device clock.
  • send_batch() shares one timestamp across the whole batch. For per-point timestamps, call send() in a loop.

Transport

By default the SDK connects over a WebSocket to /ws/device on the gateway — same wire protocol as the C SDK. This gives you:

  • lower-latency streaming of telemetry,
  • live command delivery from the UI / API to the device.

If the socket is unavailable, sends transparently fall back to POST /ingest so no data is lost.

# default — ws with http fallback
px = Plexus()

# force http (legacy)
px = Plexus(transport="http")

Handling commands

Register a handler before the first send() so the command is advertised in the auth frame:

def reboot(name, params):
    delay = params.get("delay_s", 0)
    # ... reboot logic ...
    return {"ok": True, "delay": delay}

px = Plexus()
px.on_command("reboot", reboot, description="reboot the device")
px.send("temperature", 72.5)   # opens the socket, waits for auth

The SDK sends an ack frame before invoking the handler, then a result frame with whatever the handler returns (or an error frame if it raises).

Environment Variables

Variable Description Default
PLEXUS_API_KEY API key (required) none
PLEXUS_GATEWAY_URL HTTP ingest URL https://plexus-gateway.fly.dev
PLEXUS_GATEWAY_WS_URL WebSocket URL wss://plexus-gateway.fly.dev

Architecture

Your code ── px.send() ── HTTP POST /ingest ──> plexus-gateway ──> ClickHouse + Dashboard

One thin path. No agent, no daemon, no adapters. If you want the full HardwareOps platform — dashboards, alerts, RCA, fleet views — that's the web UI at app.plexus.company. This package gets your data there.

License

Apache 2.0

About

Python SDK for streaming telemetry from any device to Plexus — sensors, CAN bus, MAVLink, cameras, MQTT

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors