Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .changeset/persistence-layer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
'@tanstack/ai': minor
'@tanstack/ai-client': minor
'@tanstack/ai-angular': minor
'@tanstack/ai-preact': minor
'@tanstack/ai-react': minor
'@tanstack/ai-solid': minor
'@tanstack/ai-svelte': minor
'@tanstack/ai-vue': minor
'@tanstack/ai-persistence': minor
'@tanstack/ai-persistence-sql': minor
'@tanstack/ai-persistence-sqlite': minor
'@tanstack/ai-persistence-postgres': minor
'@tanstack/ai-persistence-cloudflare': minor
'@tanstack/ai-persistence-drizzle': minor
'@tanstack/ai-persistence-prisma': minor
'@tanstack/ai-sandbox-persistence': minor
---

Persistence + resumable runs as composable `chat()` middleware.

`withPersistence(...)` makes any run durable: it loads/saves thread message
history (server-authoritative), creates/updates run records, persists every
AG-UI `StreamChunk` to an append-only public event log, and persists usage. It
is fully **optional** - a `chat()` with no persistence middleware is unchanged.
The primary API is `AIPersistence` / `defineAIPersistence`; `ChatPersistence` /
`defineChatPersistence` remain deprecated compatibility aliases.

**Resume.** Each persisted chunk carries an in-band, opaque `cursor` (a
monotonic per-run sequence). A client that disconnects mid-run reconnects with
`{ threadId, runId, cursor }`; `chat({ cursor })` replays the persisted public
event tail after that cursor. The public chat hooks expose resume state and a
manual `resume()` helper, with automatic resume behavior enabled unless
`autoResume` is opted out.

**Interrupts.** Actionable waits are represented by
`RUN_FINISHED.outcome.type === 'interrupt'` and resumed with AG-UI
`RunAgentInput.resume[]`. Pending user-actionable interrupts block normal new
input on the same thread by default. Approval custom events are legacy
compatibility/projection only.

**Event model.** Public stream events are separate from internal CAS/checkpoint
events. `PublicEventStore` replays the AG-UI `StreamChunk` stream itself;
`InternalEventStore` stores package-owned checkpoints that must not leak into
public replay. Optional feature validation fails loudly when a requested feature
is missing its required stores.

**Backends (shared SQL core + thin adapters).** One SQL implementation behind a
minimal `SqlDriver` (`@tanstack/ai-persistence-sql`), with dialect support for
SQLite, Postgres, and MySQL and backends for SQLite (`-sqlite`,
node:sqlite/better-sqlite3), Postgres (`-postgres`, pg), Cloudflare D1
(`-cloudflare`, with R2-backed `BlobStore`, D1-indexed R2 artifacts, and
concrete Durable Object locks), and bring-your-own Drizzle (`-drizzle`) and
Prisma (`-prisma`). The shared SQL core emits MySQL-safe DDL and conflict
handling (binary key columns, `LONGTEXT` persisted payloads, and no-op
`ON DUPLICATE KEY UPDATE` idempotent inserts). Raw drivers auto-migrate the
small base schema (`runs`, `public_events`, `internal_events`, `messages`,
`interrupts`, `metadata`, `_tanstack_ai_migrations`); ORMs own their schema.
Cloudflare artifact metadata is lazily indexed in D1 while artifact bytes and
generic blobs live in R2, with optional artifact/blob cleanup APIs for deletion
and garbage collection.
`memoryPersistence()` ships in core for tests/examples.

**Sandboxes, MCP, and workflows.** `@tanstack/ai-sandbox-persistence` bridges a
durable SQL-backed `SandboxStore` and the durable `LockStore` into
`withSandbox`, so sandbox resume and ensure-locking survive across processes.
MCP persistence is app-owned metadata plus raw stream replay only. Workflow
extensions are deferred to optional packages and should reuse these primitives
without adding base schema cost.
119 changes: 93 additions & 26 deletions docs/architecture/approval-flow-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ execution until the user explicitly approves or denies the action. This
creates a human-in-the-loop checkpoint for sensitive operations (sending
emails, making purchases, deleting data).

The primary durable wait signal is now the AG-UI terminal event
`RUN_FINISHED.outcome.type === 'interrupt'`. The legacy
`CUSTOM { name: "approval-requested" }` event is kept as a compatibility
projection for older clients and UI state updates, but new persistence and
resume logic should treat actionable waits as interrupts.

The flow spans three layers:

```mermaid
Expand All @@ -55,9 +61,9 @@ flowchart TD

| Layer | Responsibility |
|-------|----------------|
| **Server (TextEngine)** | `chat()` detects `needsApproval` emits CUSTOM `approval-requested` instead of executing the tool |
| **StreamProcessor** | Receives CUSTOM chunk → `updateToolCallApproval()` fires `onApprovalRequest` callback |
| **ChatClient** | Exposes `addToolApprovalResponse()` updates message state → triggers `checkForContinuation()` → sends new stream |
| **Server (TextEngine)** | `chat()` detects `needsApproval` -> emits a `RUN_FINISHED` interrupt outcome and may project legacy `approval-requested` events |
| **StreamProcessor** | Receives interrupt outcome / compatibility custom event -> `updateToolCallApproval()` -> fires `onApprovalRequest` callback |
| **ChatClient** | Exposes `addToolApprovalResponse()` -> updates message state -> sends AG-UI `RunAgentInput.resume[]` entries through the next request |

Framework hooks (`useChat` in React, Solid, Vue, Svelte) delegate to
`ChatClient`, which owns all concurrency and continuation logic.
Expand All @@ -70,15 +76,23 @@ Framework hooks (`useChat` in React, Solid, Vue, Svelte) delegate to

- Runs the agent loop: calls the LLM adapter, accumulates tool calls, executes
tools, and re-invokes the adapter with results.
- When a tool has `needsApproval: true`, the engine emits a
`CUSTOM { name: "approval-requested" }` event instead of executing the tool.
- The stream ends with `RUN_FINISHED { finishReason: "tool_calls" }` so the
client knows tools are pending.
- When a tool has `needsApproval: true`, the engine emits a terminal
`RUN_FINISHED` event with `outcome.type === 'interrupt'` instead of executing
the tool.
- Compatibility `CUSTOM { name: "approval-requested" }` events may still be
emitted or projected so older approval UI can render the same tool-call state.
- Persistence stores pending interrupts and validates the later
`RunAgentInput.resume[]` response before accepting normal new input on the
same thread.

### StreamProcessor (`packages/ai/src/activities/chat/stream/processor.ts`)

- Single source of truth for `UIMessage[]` state.
- On `approval-requested` custom event:
- On `RUN_FINISHED.outcome.type === 'interrupt'`:
1. Persists/surfaces each interrupt as a pending user-actionable wait.
2. Calls `updateToolCallApproval()` for approval-shaped interrupts so legacy
approval UI renders as before.
- On legacy `approval-requested` custom event:
1. Calls `updateToolCallApproval()` to set the tool-call part's state to
`approval-requested` and attach approval metadata.
2. Fires `onApprovalRequest` so the ChatClient can emit devtools events.
Expand Down Expand Up @@ -192,24 +206,31 @@ Step-by-step flow for a single tool requiring approval:
TOOL_CALL_START { toolCallId: "tc-1", toolName: "send_email" }
TOOL_CALL_ARGS { toolCallId: "tc-1", delta: '{"to":"..."}' }
TOOL_CALL_END { toolCallId: "tc-1" }
CUSTOM { name: "approval-requested", value: {
toolCallId: "tc-1",
toolName: "send_email",
input: { to: "..." },
approval: { id: "appr-1", needsApproval: true }
RUN_FINISHED { outcome: {
type: "interrupt",
interrupts: [{
id: "appr-1",
reason: "approval_required",
metadata: {
kind: "approval",
toolCallId: "tc-1",
toolName: "send_email",
input: { to: "..." }
}
}]
}}
RUN_FINISHED { finishReason: "tool_calls" }
```

### 2. StreamProcessor processes the CUSTOM chunk
### 2. StreamProcessor processes the interrupt outcome

```
handleCustomEvent():
handleRunFinished():
1. updateToolCallApproval(messages, messageId, "tc-1", "appr-1")
→ Sets part.state = "approval-requested"
→ Sets part.approval = { id: "appr-1", needsApproval: true }
2. emitMessagesChange()
3. fires onApprovalRequest({ toolCallId, toolName, input, approvalId })
2. records the pending interrupt on the client
3. emitMessagesChange()
4. fires onApprovalRequest({ toolCallId, toolName, input, approvalId })
```

### 3. Stream ends, ChatClient processes
Expand All @@ -233,7 +254,9 @@ addToolApprovalResponse({ id: "appr-1", approved: true }):
→ updateToolCallApprovalResponse():
part.approval.approved = true
part.state = "approval-responded"
2. isLoading is false → call checkForContinuation() directly
2. queue a resume entry:
{ interruptId: "appr-1", status: "resolved", payload: { approved: true } }
3. isLoading is false → call checkForContinuation() directly
```

### 5. Continuation
Expand All @@ -244,8 +267,9 @@ checkForContinuation():
2. shouldAutoSend() → areAllToolsComplete():
part.state === "approval-responded" → true
3. continuationPending = true
4. streamResponse() → new stream to server with approval in messages
5. Server sees approval, executes tool, returns result + LLM response
4. streamResponse() → new stream to server with `resume[]`
5. Persistence validates `resume[]`, resolves the pending interrupt, then the
server executes or cancels the tool and returns result + LLM response
6. continuationPending = false
```

Expand Down Expand Up @@ -388,11 +412,52 @@ Timeline (with fix):

### Approval-related AG-UI events

These are `CUSTOM` events emitted by the TextEngine, not by adapters directly.
The primary wait event is `RUN_FINISHED` with an interrupt outcome:

```typescript ignore
{
type: 'RUN_FINISHED',
outcome: {
type: 'interrupt',
interrupts: [
{
id: string,
reason: 'approval_required',
metadata: {
kind: 'approval',
toolCallId: string,
toolName: string,
input: unknown
}
}
]
}
}
```

Resume uses AG-UI `RunAgentInput.resume[]` on the next request:

```typescript ignore
{
resume: [
{
interruptId: 'appr-1',
status: 'resolved',
payload: { approved: true }
}
]
}
```

Pending user-actionable interrupts block normal input on the same thread by
default. A stale cursor replay may read the public event tail, but a new
non-resume input must resolve the pending interrupts first.

#### `approval-requested`

Emitted when a tool with `needsApproval: true` has its arguments finalized.
Legacy compatibility/projection event emitted when a tool with
`needsApproval: true` has its arguments finalized. Use this to keep older UI
rendering paths working; do not use it as the primary durable wait signal.

```typescript ignore
{
Expand Down Expand Up @@ -422,13 +487,15 @@ TOOL_CALL_START → creates tool-call part (state: awaiting-input)
TOOL_CALL_ARGS* → accumulates arguments (state: input-streaming)
TOOL_CALL_END → finalizes arguments (state: input-complete)
CUSTOM → approval-requested (state: approval-requested)
RUN_FINISHED → finishReason: "tool_calls"
RUN_FINISHED → outcome.type: "interrupt"
```

After the stream ends and the user responds, the ChatClient:
1. Updates the tool-call part (state: `approval-responded`)
2. Sends a new stream request with the full conversation (including approval)
3. The server sees the approval and either executes or cancels the tool
2. Sends a new stream request with `RunAgentInput.resume[]` entries resolving
the pending interrupt
3. Persistence validates the resume entries, resolves the interrupt, and the
server executes or cancels the tool based on the resume payload

---

Expand Down
46 changes: 37 additions & 9 deletions docs/chat/connection-adapters.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
body: { provider: "openai", model: "gpt-5.1" },
body: { provider: "openai", model: "gpt-5.5" },
}),
});
```
Expand Down Expand Up @@ -178,7 +178,7 @@ import { chatServerFn } from "./server/chat.server";

// `chatServerFn` is an in-process server-side function that synchronously
// returns an AsyncIterable<StreamChunk> — e.g. the result of
// `chat({ adapter, model, messages })` on the server.
// `chat({ adapter: openaiText("gpt-5.5"), messages })` on the server.
const { messages } = useChat({
connection: stream((messages, data) => chatServerFn({ messages, ...data })),
});
Expand All @@ -197,13 +197,28 @@ When you call into your server with an **async** function — the universal case
import { createServerFn } from "@tanstack/react-start";
import { chat, toServerSentEventsResponse } from "@tanstack/ai";
import { openaiText } from "@tanstack/ai-openai";
import type { UIMessage } from "@tanstack/ai";
import type { RunAgentResumeItem, UIMessage } from "@tanstack/ai";

type ChatFnInput = {
messages: Array<UIMessage>;
threadId?: string;
runId?: string;
cursor?: string;
resume?: Array<RunAgentResumeItem>;
};

export const chatFn = createServerFn({ method: "POST" })
.inputValidator((data: { messages: Array<UIMessage> }) => data)
.inputValidator((data: ChatFnInput) => data)
.handler(({ data }) =>
toServerSentEventsResponse(
chat({ adapter: openaiText("gpt-5.1"), messages: data.messages }),
chat({
adapter: openaiText("gpt-5.5"),
messages: data.messages,
threadId: data.threadId,
runId: data.runId,
cursor: data.cursor,
resume: data.resume,
}),
),
);
```
Expand All @@ -213,11 +228,14 @@ import { useChat } from "@tanstack/ai-react";
import { chatFn } from "./server/chat.server";

const { messages, sendMessage } = useChat({
fetcher: ({ messages }, { signal }) => chatFn({ data: { messages }, signal }),
fetcher: (input, { signal }) => chatFn({ data: input, signal }),
});
```

The fetcher receives `{ messages, data, threadId, runId }` plus an `AbortSignal` (triggered by `stop()` or when a send is superseded). Return a `Response` — whose SSE body the chat client parses for you — **or** an `AsyncIterable<StreamChunk>`, which is yielded directly. If your server function returns the stream itself (instead of wrapping it in a `Response`), the fetcher handles that too. Sync and `Promise`-wrapped returns are both accepted.
The fetcher receives `{ messages, data, threadId, runId, cursor, resume }` plus an `AbortSignal` (triggered by `stop()` or when a send is superseded). Forward `cursor` when replaying persisted public events, and forward `resume` when responding to pending AG-UI interrupts. Return a `Response` — whose SSE body the chat client parses for you — **or** an `AsyncIterable<StreamChunk>`, which is yielded directly. If your server function returns the stream itself (instead of wrapping it in a `Response`), the fetcher handles that too. Sync and `Promise`-wrapped returns are both accepted.

For the server persistence path that produces those replay cursors, see
[Resumable Chat](../persistence/resumable-chat).

> **Tip:** The choice between `fetcher` and [`stream()`](#server-functions-and-direct-async-iterables) is about **async vs sync**, not `Response`-vs-iterable — both can yield an `AsyncIterable<StreamChunk>`. `stream()`'s factory must return that iterable **synchronously**, so a server-function call (which returns a `Promise`) won't typecheck there — that's the gap `fetcher` fills ([issue #509](https://github.com/TanStack/ai/issues/509)). Use `stream()` when you can hand back an async iterable synchronously (in-process `chat()`, an RPC client, tests); use `fetcher` for anything you have to `await`. Both normalize to the same request-scoped adapter, so `stop()`/abort, error handling, and tool calls behave identically.

Expand Down Expand Up @@ -315,6 +333,8 @@ function websocketConnection(url: string): SubscribeConnectionAdapter {
JSON.stringify({
threadId: runContext?.threadId,
runId: runContext?.runId,
cursor: runContext?.cursor,
resume: runContext?.resume,
messages,
data,
}),
Expand Down Expand Up @@ -382,6 +402,8 @@ const myAdapter: ConnectConnectionAdapter = {
body: JSON.stringify({
threadId: runContext?.threadId,
runId: runContext?.runId,
cursor: runContext?.cursor,
resume: runContext?.resume,
messages,
...data,
}),
Expand Down Expand Up @@ -415,20 +437,26 @@ const myAdapter: ConnectConnectionAdapter = {
const { messages } = useChat({ connection: myAdapter });
```

`runContext` carries `threadId`, `runId`, `clientTools`, and `forwardedProps`. Include them in your request payload so the server can build an AG-UI-compliant response. If your `connect` stream completes without emitting `RUN_FINISHED`, the runtime synthesizes one for you; if it throws, a `RUN_ERROR` is synthesized.
`runContext` carries `threadId`, `runId`, `cursor`, `resume`, `clientTools`, and `forwardedProps`. Include them in your request payload so the server can build an AG-UI-compliant response, replay from `{ threadId, runId, cursor }`, and validate `RunAgentInput.resume[]` for pending interrupts. If your `connect` stream completes without emitting `RUN_FINISHED`, the runtime synthesizes one for you; if it throws, a `RUN_ERROR` is synthesized.

If the request-scoped adapter talks to a persisted `chat()` endpoint, those
fields are the durable replay identity described in
[Resumable Chat](../persistence/resumable-chat).

## The Adapter Interface

A `ConnectionAdapter` is a union — provide **either** `connect`, **or** both `subscribe` and `send`. Never both modes.

```typescript
import type { UIMessage } from "@tanstack/ai-client";
import type { ModelMessage, StreamChunk } from "@tanstack/ai";
import type { ModelMessage, RunAgentResumeItem, StreamChunk } from "@tanstack/ai";

export interface RunAgentInputContext {
threadId: string;
runId: string;
parentRunId?: string;
cursor?: string;
resume?: Array<RunAgentResumeItem>;
clientTools?: Array<{ name: string; description: string; parameters: unknown }>;
forwardedProps?: Record<string, unknown>;
}
Expand Down
Loading