Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
# Changelog

## [0.4.8] - 2026-05-19 - Video input broadening and wire safety

### Added

- `send_video_frame` now accepts raw bytes/bytearray: JPEG bytes are passed
through without re-encoding (zero CPU cost on hardware that outputs JPEG
natively); other formats (PNG, BMP, WebP) are decoded via Pillow and
re-encoded as JPEG. Install `plexus-python[video]` for Pillow support.
- `stream_camera(url, camera_id, fps, quality)` — streams from any
FFmpeg-supported source (RTSP, video file, capture device). Requires FFmpeg
on `$PATH`. Returns a `threading.Event`; call `.set()` to stop.
- `read_mjpeg_frames(pipe)` — public generator that parses raw MJPEG byte
streams (e.g. FFmpeg stdout) into individual JPEG frames by SOI/EOI markers.
Useful for custom FFmpeg pipelines before handing off to `send_video_frame`.
- Optional `video` extras group: `pip install plexus-python[video]` installs
Pillow for non-JPEG input decoding and automatic oversized-frame downsampling.

### Changed

- Frames that would exceed the gateway's 1 MB wire limit are automatically
re-encoded at a proportionally lower quality. A one-time warning is printed
to stderr; subsequent frames are silently clamped.
- `stream_camera` raises `PlexusError` synchronously (before spawning a thread)
when FFmpeg is not found, rather than silently dying in the background.

## [0.4.7] - 2026-05-14 - Video streaming API

### Added
Expand Down
6 changes: 3 additions & 3 deletions plexus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
px.send("temperature", 72.5)
"""

from plexus.client import Plexus
from plexus.client import Plexus, read_mjpeg_frames
from plexus.ws import WebSocketTransport

__version__ = "0.4.7"
__all__ = ["Plexus", "WebSocketTransport"]
__version__ = "0.4.8"
__all__ = ["Plexus", "WebSocketTransport", "read_mjpeg_frames"]
252 changes: 226 additions & 26 deletions plexus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@
import json
import logging
import os
import shutil
import subprocess
import sys
import threading
import time
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, Generator, List, Optional, Tuple, Union

import requests

Expand Down Expand Up @@ -75,6 +78,36 @@ def _say(line: str) -> None:
# Flexible value type - supports any JSON-serializable value
FlexValue = Union[int, float, str, bool, Dict[str, Any], List[Any]]

_JPEG_SOI = b"\xff\xd8"
_JPEG_EOI = b"\xff\xd9"
_FRAME_JPEG_MAX = 750_000 # gateway is 1MB; base64 × 1.33 + envelope ≈ 998KB at this size


def read_mjpeg_frames(pipe, chunk: int = 65536) -> Generator[bytes, None, None]:
"""Read a raw MJPEG byte stream (e.g. FFmpeg stdout) and yield complete JPEG frames.

Scans for SOI (\xff\xd8) / EOI (\xff\xd9) markers to delimit frames.
Useful when building custom FFmpeg pipelines and handing off bytes to
send_video_frame().
"""
buf = b""
while True:
data = pipe.read(chunk)
if not data:
break
buf += data
while True:
start = buf.find(_JPEG_SOI)
if start == -1:
buf = b""
break
end = buf.find(_JPEG_EOI, start + 2)
if end == -1:
buf = buf[start:] # keep partial frame
break
yield buf[start:end + 2]
buf = buf[end + 2:]


class PlexusError(Exception):
"""Base exception for Plexus errors."""
Expand Down Expand Up @@ -136,6 +169,8 @@ def __init__(
self._session: Optional[requests.Session] = None
self._store_frames: bool = False
self._cv2 = None
self._pil_image = None # lazy PIL.Image import
self._fit_warned: bool = False

if transport not in ("ws", "http"):
raise ValueError(f"transport must be 'ws' or 'http', got {transport!r}")
Expand Down Expand Up @@ -344,50 +379,152 @@ def _on_source_id_assigned(self, assigned: str) -> None:
except Exception as e: # pragma: no cover - persistence failure is non-fatal
logger.debug("failed to persist assigned source_id: %s", e)

def _encode_frame(self, frame, quality: int) -> Tuple[bytes, int, int]:
"""Normalize any supported frame type to (jpeg_bytes, width, height).

Accepted inputs:
- bytes/bytearray: raw JPEG passthrough (magic \\xff\\xd8), or any
Pillow-readable format (PNG, BMP, WebP) which is decoded and re-encoded
- numpy ndarray: encoded via OpenCV (cv2 must be installed)
"""
import io

# --- bytes input ---
if isinstance(frame, (bytes, bytearray)):
if frame[:2] == b"\xff\xd8":
# Already JPEG — passthrough, extract dimensions via Pillow if available
try:
pil = self._get_pil()
img = pil.open(io.BytesIO(frame))
return bytes(frame), img.width, img.height
except Exception:
# Pillow unavailable or unreadable — send as-is, dimensions unknown
return bytes(frame), 0, 0
# Non-JPEG bytes (PNG, BMP, WebP, …) — Pillow decode then re-encode as JPEG
pil = self._get_pil(required=True)
img = pil.open(io.BytesIO(frame))
return self._pil_to_jpeg(img, quality)

# --- numpy array (OpenCV path) ---
if hasattr(frame, "shape"):
cv2 = self._get_cv2(required=True)
height, width = frame.shape[:2]
ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not ok:
raise PlexusError("cv2.imencode failed to encode frame")
return buf.tobytes(), width, height

raise ValueError(
f"Unsupported frame type: {type(frame).__name__}. "
"Expected bytes/bytearray (JPEG or Pillow-readable) or numpy ndarray."
)

def _get_cv2(self, required: bool = False):
if self._cv2 is None:
try:
import cv2 as _cv2
self._cv2 = _cv2
except ImportError as e:
if required:
raise ImportError(
"This frame type requires opencv-python. "
"Install with: pip install opencv-python"
) from e
return self._cv2

def _get_pil(self, required: bool = False):
if self._pil_image is None:
try:
import PIL.Image as _PILImage
self._pil_image = _PILImage
except ImportError as e:
if required:
raise ImportError(
"This frame type requires Pillow. "
"Install with: pip install plexus-python[video]"
) from e
return self._pil_image

def _pil_to_jpeg(self, img, quality: int) -> Tuple[bytes, int, int]:
import io
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=quality)
return buf.getvalue(), img.width, img.height

def _fit_to_wire(self, jpeg_bytes: bytes, requested_quality: int) -> bytes:
"""Re-encode JPEG at lower quality if it would exceed the gateway 1MB limit.

Warns once per Plexus instance so the user sees the issue at startup
without being flooded during a live stream.
"""
import io
if len(jpeg_bytes) <= _FRAME_JPEG_MAX:
return jpeg_bytes
target_quality = max(10, int(requested_quality * _FRAME_JPEG_MAX / len(jpeg_bytes)))
pil = self._get_pil()
if pil is None:
if not self._fit_warned:
self._fit_warned = True
wire_kb = len(jpeg_bytes) * 4 // 3 // 1024
_say(
f"frame too large (~{wire_kb}KB on wire) and Pillow is not installed — "
"install plexus-python[video] to enable automatic downsampling"
)
return jpeg_bytes
try:
img = pil.open(io.BytesIO(jpeg_bytes))
buf = io.BytesIO()
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
img.save(buf, format="JPEG", quality=target_quality)
result = buf.getvalue()
if not self._fit_warned:
self._fit_warned = True
wire_kb = len(jpeg_bytes) * 4 // 3 // 1024
_say(
f"frame too large (quality={requested_quality}, ~{wire_kb}KB on wire), "
f"re-encoded at quality={target_quality} — lower quality or resolution to silence"
)
return result
except Exception as e:
logger.debug("_fit_to_wire re-encode failed: %s", e)
return jpeg_bytes

def send_video_frame(
self,
frame,
camera_id: str = "camera:0",
quality: int = 85,
timestamp: Optional[float] = None,
) -> bool:
"""
Send a single video frame to Plexus (WebSocket transport only).
"""Send a single video frame to Plexus (WebSocket transport only).

Args:
frame: A numpy array (H, W, C) as returned by cv2.VideoCapture.read()
frame: One of:
- numpy ndarray (H, W, C) — from cv2.VideoCapture or picamera2
- bytes/bytearray — raw JPEG passthrough (zero re-encode), or any
Pillow-readable format (PNG, BMP, WebP) which is decoded and re-encoded
camera_id: Logical camera identifier (e.g. "picam:0", "usb:1")
quality: JPEG compression quality, 1-100. Default 85.
timestamp: Unix timestamp. If not provided, uses current time.
quality: JPEG compression quality, 1-100. Default 85. Also used as the
baseline when adaptive downsampling kicks in for oversized frames.
timestamp: Unix timestamp in seconds. If not provided, uses current time.

Returns:
True if the frame was sent successfully.

Raises:
PlexusError: If transport is not 'ws', or if the send fails.
ImportError: If opencv-python is not installed.

Example:
cap = cv2.VideoCapture(0)
ret, frame = cap.read()
px.send_video_frame(frame, camera_id="picam:0")
PlexusError: If transport is not 'ws'.
ValueError: If frame type is not supported.
ImportError: If a required optional dependency is missing.
"""
if self.transport != "ws":
raise PlexusError("send_video_frame requires transport='ws'")

if self._cv2 is None:
try:
import cv2 as _cv2
self._cv2 = _cv2
except ImportError as e:
raise ImportError(
"send_video_frame requires opencv-python. "
"Install with: pip install opencv-python"
) from e

height, width = frame.shape[:2]
_, buf = self._cv2.imencode(".jpg", frame, [self._cv2.IMWRITE_JPEG_QUALITY, quality])
b64 = base64.b64encode(buf).decode()
jpeg_bytes, width, height = self._encode_frame(frame, quality)
jpeg_bytes = self._fit_to_wire(jpeg_bytes, quality)
b64 = base64.b64encode(jpeg_bytes).decode()

ws = self._ensure_ws()
if not ws.is_authenticated:
Expand All @@ -403,6 +540,69 @@ def send_video_frame(
"timestamp": self._normalize_ts_ms(timestamp),
})

def stream_camera(
self,
url: str,
camera_id: str = "camera:0",
fps: int = 15,
quality: int = 85,
) -> "threading.Event":
"""Stream video from an RTSP URL or file path via FFmpeg (WebSocket only).

Requires FFmpeg to be installed and available on $PATH.

Args:
url: RTSP stream URL (rtsp://...), video file path, or any FFmpeg-supported source.
camera_id: Logical camera identifier forwarded in each frame.
fps: Maximum frames per second to send. Default 15.
quality: JPEG quality for re-encoded frames, 1-100. Default 85.

Returns:
A threading.Event. Call .set() on it to stop streaming.

Raises:
PlexusError: If transport is not 'ws' or FFmpeg is not found.

Example:
stop = px.stream_camera("rtsp://192.168.1.100/stream", camera_id="front:0")
time.sleep(60)
stop.set()
"""
if self.transport != "ws":
raise PlexusError("stream_camera requires transport='ws'")
if shutil.which("ffmpeg") is None:
raise PlexusError(
"FFmpeg not found. Install it: https://ffmpeg.org/download.html"
)

stop_event = threading.Event()

def _run():
cmd = [
"ffmpeg", "-loglevel", "error",
"-i", url,
"-vf", f"fps={fps}",
"-f", "image2pipe",
"-vcodec", "mjpeg",
"pipe:1",
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
for jpeg in read_mjpeg_frames(proc.stdout):
if stop_event.is_set():
break
try:
self.send_video_frame(jpeg, camera_id=camera_id, quality=quality)
except Exception as e:
logger.debug("stream_camera send error: %s", e)
finally:
proc.terminate()
proc.wait()

t = threading.Thread(target=_run, daemon=True)
t.start()
return stop_event

def on_command(
self,
name: str,
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "plexus-python"
version = "0.4.7"
version = "0.4.8"
description = "Thin Python SDK for Plexus — send telemetry in one line"
readme = "README.md"
license = "Apache-2.0"
Expand Down Expand Up @@ -33,6 +33,7 @@ dependencies = [
]

[project.optional-dependencies]
video = ["Pillow>=9.0"]
dev = ["pytest", "pytest-cov", "ruff", "websockets>=12"]

[project.scripts]
Expand Down
Loading
Loading