From 8a48a5ef5d80fa64268f2dfa9b03bf286dac5378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Blond=20Daugaard?= Date: Mon, 11 May 2026 21:34:29 -0400 Subject: [PATCH 1/2] Make HTTP ingestion async and retrying --- README.md | 6 ++ src/sessionbat/client.py | 10 ++++ src/sessionbat/transports.py | 112 ++++++++++++++++++++++++++++++++++- tests/test_client.py | 100 ++++++++++++++++++++++++++++--- 4 files changed, 218 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 0b5b162..a099020 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,12 @@ The SDK sends events to SessionBat ingestion by default. Pass `api_key` directly or set `SESSIONBAT_API_KEY`. For tests or local debugging, pass an explicit transport such as `MemoryTransport` or `StdoutTransport`. +HTTP ingestion runs in a background thread so recording observations does not +block your application on network I/O. Transient failures are retried with +bounded backoff, and queued events are flushed automatically during interpreter +shutdown. Call `client.flush()` or `client.close()` when you need to wait for +delivery before exiting a short-lived process. + ### `Session` A `Session` records completed observations: diff --git a/src/sessionbat/client.py b/src/sessionbat/client.py index fc92ccb..0f0e348 100644 --- a/src/sessionbat/client.py +++ b/src/sessionbat/client.py @@ -73,6 +73,16 @@ def _send(self, payload: dict[str, Any]) -> None: assert self.transport is not None self.transport.send(payload) + def flush(self, timeout: float | None = None) -> bool: + if self.transport is None or not hasattr(self.transport, "flush"): + return True + return bool(self.transport.flush(timeout=timeout)) + + def close(self, timeout: float | None = None) -> bool: + if self.transport is None or not hasattr(self.transport, "close"): + return True + return bool(self.transport.close(timeout=timeout)) + def langchain_callback( self, *, diff --git a/src/sessionbat/transports.py b/src/sessionbat/transports.py index e656246..05599f7 100644 --- a/src/sessionbat/transports.py +++ b/src/sessionbat/transports.py @@ -1,8 +1,13 @@ from __future__ import annotations +import atexit import json +import random import sys -from dataclasses import dataclass +import threading +import time +from dataclasses import dataclass, field +from queue import Empty, Full, Queue from typing import Protocol from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen @@ -23,8 +28,88 @@ class IngestionTransport: api_key: str endpoint: str = DEFAULT_INGESTION_ENDPOINT timeout: float = 10.0 + max_retries: int = 3 + base_backoff: float = 0.25 + max_backoff: float = 2.0 + queue_size: int = 1000 + shutdown_timeout: float = 2.0 + _queue: Queue[dict] = field(init=False, repr=False) + _worker: threading.Thread | None = field(default=None, init=False, repr=False) + _closed: bool = field(default=False, init=False, repr=False) + _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False) + + def __post_init__(self) -> None: + self._queue = Queue(maxsize=self.queue_size) + atexit.register(self.close, timeout=self.shutdown_timeout) def send(self, payload: dict) -> None: + with self._lock: + if self._closed: + return + self._ensure_worker_started() + + try: + self._queue.put_nowait(payload) + except Full: + return + + def flush(self, timeout: float | None = None) -> bool: + deadline = None if timeout is None else time.monotonic() + timeout + while self._queue.unfinished_tasks: + if deadline is not None and time.monotonic() >= deadline: + return False + time.sleep(0.01) + return True + + def close(self, timeout: float | None = None) -> bool: + deadline = None if timeout is None else time.monotonic() + timeout + with self._lock: + worker = self._worker + if not self._closed: + self._closed = True + + flushed = self.flush(timeout=timeout) + if worker is None: + return flushed + + remaining = None if deadline is None else max(0.0, deadline - time.monotonic()) + worker.join(timeout=remaining) + return flushed and not worker.is_alive() + + def _ensure_worker_started(self) -> None: + if self._worker is not None: + return + self._worker = threading.Thread(target=self._run, name="sessionbat-ingestion") + self._worker.daemon = True + self._worker.start() + + def _run(self) -> None: + while True: + try: + payload = self._queue.get(timeout=0.1) + except Empty: + if self._closed: + return + continue + + try: + self._send_with_retries(payload) + finally: + self._queue.task_done() + + def _send_with_retries(self, payload: dict) -> None: + attempt = 0 + while True: + try: + self._send_once(payload) + return + except TransportError as error: + if attempt >= self.max_retries or not _is_retryable(error): + return + time.sleep(_backoff(attempt, self.base_backoff, self.max_backoff)) + attempt += 1 + + def _send_once(self, payload: dict) -> None: body = json.dumps(payload).encode("utf-8") request = Request( self.endpoint, @@ -41,11 +126,32 @@ def send(self, payload: dict) -> None: with urlopen(request, timeout=self.timeout) as response: status = response.status if status < 200 or status >= 300: - raise TransportError(f"ingestion failed with HTTP {status}") + raise TransportError(f"ingestion failed with HTTP {status}", status) except HTTPError as error: - raise TransportError(f"ingestion failed with HTTP {error.code}") from error + raise TransportError(f"ingestion failed with HTTP {error.code}", error.code) from error except URLError as error: raise TransportError(f"ingestion request failed: {error.reason}") from error + except TimeoutError as error: + raise TransportError("ingestion request timed out") from error + except OSError as error: + raise TransportError(f"ingestion request failed: {error}") from error + + +def _is_retryable(error: TransportError) -> bool: + if len(error.args) > 1 and isinstance(error.args[1], int): + status = error.args[1] + return status == 408 or status == 429 or status >= 500 + cause = error.__cause__ + if isinstance(cause, HTTPError): + return cause.code == 408 or cause.code == 429 or cause.code >= 500 + if isinstance(cause, URLError | TimeoutError | OSError): + return True + return False + + +def _backoff(attempt: int, base_backoff: float, max_backoff: float) -> float: + delay = min(max_backoff, base_backoff * (2**attempt)) + return random.uniform(0, delay) class StdoutTransport: diff --git a/tests/test_client.py b/tests/test_client.py index d1b43e0..e10698d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,6 +2,7 @@ import json import threading +import time from collections.abc import Iterator from datetime import datetime from http.server import BaseHTTPRequestHandler, HTTPServer @@ -9,7 +10,7 @@ import pytest from sessionbat import SessionBat -from sessionbat.transports import IngestionTransport, MemoryTransport, TransportError +from sessionbat.transports import IngestionTransport, MemoryTransport class TestSessionBatClient: @@ -131,11 +132,16 @@ def test_records_errors_on_failed_operations(self) -> None: class _RecordingHandler(BaseHTTPRequestHandler): requests: list[dict] = [] - response_status = 202 + response_status: int | list[int] = 202 def do_POST(self) -> None: length = int(self.headers["Content-Length"]) body = self.rfile.read(length) + response_status = self.__class__.response_status + if isinstance(response_status, list): + status = response_status.pop(0) if response_status else 202 + else: + status = response_status self.__class__.requests.append( { "path": self.path, @@ -143,7 +149,7 @@ def do_POST(self) -> None: "body": json.loads(body), } ) - self.send_response(self.__class__.response_status) + self.send_response(status) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(b'{"ok":true}') @@ -195,6 +201,7 @@ def test_uses_environment_api_key( session = client.session(session_id="thread_123") session.tool_call(tool_name="lookup_account", input={"account_id": "acct_123"}) + assert client.flush(timeout=1.0) request = _RecordingHandler.requests[0] assert request["headers"]["Authorization"] == "Bearer sbat_ingest_env" @@ -212,6 +219,7 @@ def test_posts_sdk_payload_with_bearer_auth(self, ingestion_server: str) -> None output={"status": "locked"}, ) + assert client.flush(timeout=1.0) request = _RecordingHandler.requests[0] payload = request["body"] assert request["path"] == "/api/v1/ingestion/events" @@ -228,9 +236,87 @@ def test_posts_sdk_payload_with_bearer_auth(self, ingestion_server: str) -> None assert payload["observation"]["input"] == {"account_id": "acct_123"} assert payload["observation"]["output"] == {"status": "locked"} - def test_raises_transport_error_for_non_2xx_response(self, ingestion_server: str) -> None: - _RecordingHandler.response_status = 500 + def test_retries_transient_http_failures(self, ingestion_server: str) -> None: + _RecordingHandler.response_status = [500, 500, 202] + transport = IngestionTransport( + api_key="sbat_ingest_test", + endpoint=ingestion_server, + base_backoff=0, + max_backoff=0, + ) + + transport.send({"id": "evt_123"}) + + assert transport.flush(timeout=1.0) + assert [request["body"]["id"] for request in _RecordingHandler.requests] == [ + "evt_123", + "evt_123", + "evt_123", + ] + + def test_does_not_retry_or_raise_for_non_retryable_http_failures( + self, ingestion_server: str + ) -> None: + _RecordingHandler.response_status = 400 transport = IngestionTransport(api_key="sbat_ingest_test", endpoint=ingestion_server) - with pytest.raises(TransportError, match="HTTP 500"): - transport.send({"id": "evt_123"}) + transport.send({"id": "evt_123"}) + + assert transport.flush(timeout=1.0) + assert len(_RecordingHandler.requests) == 1 + + def test_send_does_not_raise_for_network_failures(self) -> None: + transport = IngestionTransport( + api_key="sbat_ingest_test", + endpoint="http://127.0.0.1:1/api/v1/ingestion/events", + base_backoff=0, + max_backoff=0, + timeout=0.01, + ) + + transport.send({"id": "evt_123"}) + + assert transport.flush(timeout=1.0) + + def test_full_queue_drops_newest_without_blocking( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + started = threading.Event() + release = threading.Event() + sent: list[str] = [] + + def slow_send_once(self: IngestionTransport, payload: dict) -> None: + sent.append(payload["id"]) + started.set() + release.wait(timeout=1.0) + + monkeypatch.setattr(IngestionTransport, "_send_once", slow_send_once) + transport = IngestionTransport(api_key="sbat_ingest_test", queue_size=1) + + transport.send({"id": "evt_1"}) + assert started.wait(timeout=1.0) + transport.send({"id": "evt_2"}) + start = time.monotonic() + transport.send({"id": "evt_3"}) + + assert time.monotonic() - start < 0.1 + release.set() + assert transport.close(timeout=1.0) + assert sent == ["evt_1", "evt_2"] + + def test_flush_returns_false_when_timeout_expires( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + release = threading.Event() + + def slow_send_once(self: IngestionTransport, payload: dict) -> None: + release.wait(timeout=1.0) + + monkeypatch.setattr(IngestionTransport, "_send_once", slow_send_once) + transport = IngestionTransport(api_key="sbat_ingest_test") + + transport.send({"id": "evt_123"}) + + assert transport.flush(timeout=0.01) is False + release.set() + assert transport.close(timeout=1.0) From 087e25edf9470c83faab3e075f1f107196bd9065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Blond=20Daugaard?= Date: Tue, 12 May 2026 20:52:15 -0400 Subject: [PATCH 2/2] Harden async ingestion worker --- src/sessionbat/transports.py | 10 +++-- tests/test_client.py | 74 ++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/src/sessionbat/transports.py b/src/sessionbat/transports.py index 05599f7..ac92321 100644 --- a/src/sessionbat/transports.py +++ b/src/sessionbat/transports.py @@ -48,10 +48,10 @@ def send(self, payload: dict) -> None: return self._ensure_worker_started() - try: - self._queue.put_nowait(payload) - except Full: - return + try: + self._queue.put_nowait(payload) + except Full: + return def flush(self, timeout: float | None = None) -> bool: deadline = None if timeout is None else time.monotonic() + timeout @@ -108,6 +108,8 @@ def _send_with_retries(self, payload: dict) -> None: return time.sleep(_backoff(attempt, self.base_backoff, self.max_backoff)) attempt += 1 + except Exception: + return def _send_once(self, payload: dict) -> None: body = json.dumps(payload).encode("utf-8") diff --git a/tests/test_client.py b/tests/test_client.py index e10698d..783e667 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -6,6 +6,7 @@ from collections.abc import Iterator from datetime import datetime from http.server import BaseHTTPRequestHandler, HTTPServer +from queue import Queue import pytest @@ -278,6 +279,79 @@ def test_send_does_not_raise_for_network_failures(self) -> None: assert transport.flush(timeout=1.0) + def test_unexpected_send_failure_does_not_stop_worker( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + sent: list[str] = [] + + def send_once(self: IngestionTransport, payload: dict) -> None: + if payload["id"] == "evt_bad": + raise TypeError("not JSON serializable") + sent.append(payload["id"]) + + monkeypatch.setattr(IngestionTransport, "_send_once", send_once) + transport = IngestionTransport(api_key="sbat_ingest_test") + + transport.send({"id": "evt_bad"}) + transport.send({"id": "evt_good"}) + + assert transport.flush(timeout=1.0) + assert sent == ["evt_good"] + assert transport.close(timeout=1.0) + + def test_close_waits_for_event_accepted_during_send( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + sent: list[str] = [] + + class BlockingPutQueue: + def __init__(self) -> None: + self.inner: Queue[dict] = Queue() + self.put_started = threading.Event() + self.release_put = threading.Event() + + @property + def unfinished_tasks(self) -> int: + return self.inner.unfinished_tasks + + def put_nowait(self, payload: dict) -> None: + self.put_started.set() + self.release_put.wait(timeout=1.0) + self.inner.put_nowait(payload) + + def get(self, timeout: float) -> dict: + return self.inner.get(timeout=timeout) + + def task_done(self) -> None: + self.inner.task_done() + + def send_once(self: IngestionTransport, payload: dict) -> None: + sent.append(payload["id"]) + + monkeypatch.setattr(IngestionTransport, "_send_once", send_once) + transport = IngestionTransport(api_key="sbat_ingest_test") + queue = BlockingPutQueue() + transport._queue = queue + close_result: list[bool] = [] + + send_thread = threading.Thread(target=transport.send, args=({"id": "evt_123"},)) + send_thread.start() + assert queue.put_started.wait(timeout=1.0) + + close_thread = threading.Thread( + target=lambda: close_result.append(transport.close(timeout=1.0)) + ) + close_thread.start() + time.sleep(0.05) + + assert close_thread.is_alive() + queue.release_put.set() + send_thread.join(timeout=1.0) + close_thread.join(timeout=1.0) + + assert close_result == [True] + assert sent == ["evt_123"] + def test_full_queue_drops_newest_without_blocking( self, monkeypatch: pytest.MonkeyPatch ) -> None: