Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
python-version: ['3.10', '3.11', '3.12', '3.13']

steps:
- uses: actions/checkout@v4
Expand Down
64 changes: 52 additions & 12 deletions plexus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
import logging
import os
import shutil
import socket
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.request
from contextlib import contextmanager
from typing import Any, Dict, Generator, List, Optional, Tuple, Union

import requests

from plexus.buffer import BufferBackend, MemoryBuffer, SqliteBuffer
from plexus.config import (
RetryConfig,
Expand All @@ -61,6 +62,46 @@
)
logger = logging.getLogger(__name__)


class _Response:
__slots__ = ("status_code", "text")

def __init__(self, status_code: int, text: str):
self.status_code = status_code
self.text = text


class _Session:
def __init__(self):
self.headers: Dict[str, str] = {}

def post(self, url: str, data: bytes = b"", headers: Optional[Dict[str, str]] = None, timeout: float = 10.0) -> "_Response":
req_headers = {**self.headers, **(headers or {})}
req = urllib.request.Request(url, data=data, headers=req_headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return _Response(resp.status, resp.read().decode("utf-8", errors="replace"))
except urllib.error.HTTPError as e:
return _Response(e.code, e.read().decode("utf-8", errors="replace"))
except urllib.error.URLError as e:
if isinstance(e.reason, socket.timeout):
raise _Timeout(str(e.reason))
raise _ConnError(str(e.reason))
except (TimeoutError, socket.timeout) as e:
raise _Timeout(str(e))

def close(self) -> None:
pass


class _Timeout(OSError):
pass


class _ConnError(OSError):
pass


# Status messages to stderr so users running `python my_script.py` see what's
# happening without having to configure logging. Set PLEXUS_QUIET=1 to disable.
_QUIET = os.environ.get("PLEXUS_QUIET", "").lower() in ("1", "true", "yes")
Expand Down Expand Up @@ -166,7 +207,7 @@ def __init__(
self._max_buffer_size = max_buffer_size

self._run_id: Optional[str] = None
self._session: Optional[requests.Session] = None
self._session: Optional[_Session] = None
self._store_frames: bool = False
self._cv2 = None
self._pil_image = None # lazy PIL.Image import
Expand Down Expand Up @@ -202,10 +243,9 @@ def max_buffer_size(self, value):
self._max_buffer_size = value
self._buffer._max_size = value

def _get_session(self) -> requests.Session:
"""Get or create a requests session for connection pooling."""
def _get_session(self) -> _Session:
if self._session is None:
self._session = requests.Session()
self._session = _Session()
if self.api_key:
self._session.headers["x-api-key"] = self.api_key
self._session.headers["Content-Type"] = "application/json"
Expand Down Expand Up @@ -732,14 +772,14 @@ def _send_points(self, points: List[Dict[str, Any]]) -> bool:
f"API error: {response.status_code} - {response.text}"
)

except requests.exceptions.Timeout:
except _Timeout:
last_error = PlexusError(f"Request timed out after {self.timeout}s")
if attempt < self.retry_config.max_retries:
time.sleep(self.retry_config.get_delay(attempt))
continue
break

except requests.exceptions.ConnectionError as e:
except _ConnError as e:
last_error = PlexusError(f"Connection failed: {e}")
if attempt < self.retry_config.max_retries:
time.sleep(self.retry_config.get_delay(attempt))
Expand Down Expand Up @@ -839,13 +879,13 @@ def run(self, run_id: str, tags: Optional[Dict[str, str]] = None, store_frames:
try:
self._get_session().post(
f"{self.endpoint}/api/runs",
json={
data=json.dumps({
"run_id": run_id,
"source_id": self.source_id,
"status": "started",
"tags": tags,
"timestamp": (int(time.time() * 1000) + self._clock_offset_ms) / 1000,
},
}).encode("utf-8"),
timeout=self.timeout,
)
except Exception as e:
Expand All @@ -858,12 +898,12 @@ def run(self, run_id: str, tags: Optional[Dict[str, str]] = None, store_frames:
try:
self._get_session().post(
f"{self.endpoint}/api/runs",
json={
data=json.dumps({
"run_id": run_id,
"source_id": self.source_id,
"status": "ended",
"timestamp": (int(time.time() * 1000) + self._clock_offset_ms) / 1000,
},
}).encode("utf-8"),
timeout=self.timeout,
)
except Exception as e:
Expand Down
8 changes: 3 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ version = "0.4.9"
description = "Thin Python SDK for Plexus — send telemetry in one line"
readme = "README.md"
license = "Apache-2.0"
requires-python = ">=3.9"
requires-python = ">=3.10"
authors = [
{ name = "Plexus", email = "hello@plexus.dev" }
]
Expand All @@ -19,21 +19,19 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Topic :: System :: Hardware",
]
dependencies = [
"requests>=2.32.4",
"websocket-client>=1.7",
]

[project.optional-dependencies]
video = ["Pillow>=11.2.1"]
dev = ["pytest>=8.3.5", "pytest-cov", "ruff", "websockets>=12"]
video = ["Pillow>=12.2.0"]
dev = ["pytest>=9.0.3", "pytest-cov", "ruff", "websockets>=12"]

[project.scripts]
plexus = "plexus.cli:main"
Expand Down
Loading
Loading