diff --git a/.changeset/persistence-layer.md b/.changeset/persistence-layer.md new file mode 100644 index 000000000..8ed225f80 --- /dev/null +++ b/.changeset/persistence-layer.md @@ -0,0 +1,69 @@ +--- +'@tanstack/ai': minor +'@tanstack/ai-client': minor +'@tanstack/ai-angular': minor +'@tanstack/ai-preact': minor +'@tanstack/ai-react': minor +'@tanstack/ai-solid': minor +'@tanstack/ai-svelte': minor +'@tanstack/ai-vue': minor +'@tanstack/ai-persistence': minor +'@tanstack/ai-persistence-sql': minor +'@tanstack/ai-persistence-sqlite': minor +'@tanstack/ai-persistence-postgres': minor +'@tanstack/ai-persistence-cloudflare': minor +'@tanstack/ai-persistence-drizzle': minor +'@tanstack/ai-persistence-prisma': minor +'@tanstack/ai-sandbox-persistence': minor +--- + +Persistence + resumable runs as composable `chat()` middleware. + +`withPersistence(...)` makes any run durable: it loads/saves thread message +history (server-authoritative), creates/updates run records, persists every +AG-UI `StreamChunk` to an append-only public event log, and persists usage. It +is fully **optional** - a `chat()` with no persistence middleware is unchanged. +The primary API is `AIPersistence` / `defineAIPersistence`; `ChatPersistence` / +`defineChatPersistence` remain deprecated compatibility aliases. + +**Resume.** Each persisted chunk carries an in-band, opaque `cursor` (a +monotonic per-run sequence). A client that disconnects mid-run reconnects with +`{ threadId, runId, cursor }`; `chat({ cursor })` replays the persisted public +event tail after that cursor. The public chat hooks expose resume state and a +manual `resume()` helper, with automatic resume behavior enabled unless +`autoResume` is opted out. + +**Interrupts.** Actionable waits are represented by +`RUN_FINISHED.outcome.type === 'interrupt'` and resumed with AG-UI +`RunAgentInput.resume[]`. Pending user-actionable interrupts block normal new +input on the same thread by default. Approval custom events are legacy +compatibility/projection only. + +**Event model.** Public stream events are separate from internal CAS/checkpoint +events. `PublicEventStore` replays the AG-UI `StreamChunk` stream itself; +`InternalEventStore` stores package-owned checkpoints that must not leak into +public replay. Optional feature validation fails loudly when a requested feature +is missing its required stores. + +**Backends (shared SQL core + thin adapters).** One SQL implementation behind a +minimal `SqlDriver` (`@tanstack/ai-persistence-sql`), with dialect support for +SQLite, Postgres, and MySQL and backends for SQLite (`-sqlite`, +node:sqlite/better-sqlite3), Postgres (`-postgres`, pg), Cloudflare D1 +(`-cloudflare`, with R2-backed `BlobStore`, D1-indexed R2 artifacts, and +concrete Durable Object locks), and bring-your-own Drizzle (`-drizzle`) and +Prisma (`-prisma`). The shared SQL core emits MySQL-safe DDL and conflict +handling (binary key columns, `LONGTEXT` persisted payloads, and no-op +`ON DUPLICATE KEY UPDATE` idempotent inserts). Raw drivers auto-migrate the +small base schema (`runs`, `public_events`, `internal_events`, `messages`, +`interrupts`, `metadata`, `_tanstack_ai_migrations`); ORMs own their schema. +Cloudflare artifact metadata is lazily indexed in D1 while artifact bytes and +generic blobs live in R2, with optional artifact/blob cleanup APIs for deletion +and garbage collection. +`memoryPersistence()` ships in core for tests/examples. + +**Sandboxes, MCP, and workflows.** `@tanstack/ai-sandbox-persistence` bridges a +durable SQL-backed `SandboxStore` and the durable `LockStore` into +`withSandbox`, so sandbox resume and ensure-locking survive across processes. +MCP persistence is app-owned metadata plus raw stream replay only. Workflow +extensions are deferred to optional packages and should reuse these primitives +without adding base schema cost. diff --git a/docs/architecture/approval-flow-processing.md b/docs/architecture/approval-flow-processing.md index 04632bbe6..aa42c9aa1 100644 --- a/docs/architecture/approval-flow-processing.md +++ b/docs/architecture/approval-flow-processing.md @@ -41,6 +41,12 @@ execution until the user explicitly approves or denies the action. This creates a human-in-the-loop checkpoint for sensitive operations (sending emails, making purchases, deleting data). +The primary durable wait signal is now the AG-UI terminal event +`RUN_FINISHED.outcome.type === 'interrupt'`. The legacy +`CUSTOM { name: "approval-requested" }` event is kept as a compatibility +projection for older clients and UI state updates, but new persistence and +resume logic should treat actionable waits as interrupts. + The flow spans three layers: ```mermaid @@ -55,9 +61,9 @@ flowchart TD | Layer | Responsibility | |-------|----------------| -| **Server (TextEngine)** | `chat()` detects `needsApproval` → emits CUSTOM `approval-requested` instead of executing the tool | -| **StreamProcessor** | Receives CUSTOM chunk → `updateToolCallApproval()` → fires `onApprovalRequest` callback | -| **ChatClient** | Exposes `addToolApprovalResponse()` → updates message state → triggers `checkForContinuation()` → sends new stream | +| **Server (TextEngine)** | `chat()` detects `needsApproval` -> emits a `RUN_FINISHED` interrupt outcome and may project legacy `approval-requested` events | +| **StreamProcessor** | Receives interrupt outcome / compatibility custom event -> `updateToolCallApproval()` -> fires `onApprovalRequest` callback | +| **ChatClient** | Exposes `addToolApprovalResponse()` -> updates message state -> sends AG-UI `RunAgentInput.resume[]` entries through the next request | Framework hooks (`useChat` in React, Solid, Vue, Svelte) delegate to `ChatClient`, which owns all concurrency and continuation logic. @@ -70,15 +76,23 @@ Framework hooks (`useChat` in React, Solid, Vue, Svelte) delegate to - Runs the agent loop: calls the LLM adapter, accumulates tool calls, executes tools, and re-invokes the adapter with results. -- When a tool has `needsApproval: true`, the engine emits a - `CUSTOM { name: "approval-requested" }` event instead of executing the tool. -- The stream ends with `RUN_FINISHED { finishReason: "tool_calls" }` so the - client knows tools are pending. +- When a tool has `needsApproval: true`, the engine emits a terminal + `RUN_FINISHED` event with `outcome.type === 'interrupt'` instead of executing + the tool. +- Compatibility `CUSTOM { name: "approval-requested" }` events may still be + emitted or projected so older approval UI can render the same tool-call state. +- Persistence stores pending interrupts and validates the later + `RunAgentInput.resume[]` response before accepting normal new input on the + same thread. ### StreamProcessor (`packages/ai/src/activities/chat/stream/processor.ts`) - Single source of truth for `UIMessage[]` state. -- On `approval-requested` custom event: +- On `RUN_FINISHED.outcome.type === 'interrupt'`: + 1. Persists/surfaces each interrupt as a pending user-actionable wait. + 2. Calls `updateToolCallApproval()` for approval-shaped interrupts so legacy + approval UI renders as before. +- On legacy `approval-requested` custom event: 1. Calls `updateToolCallApproval()` to set the tool-call part's state to `approval-requested` and attach approval metadata. 2. Fires `onApprovalRequest` so the ChatClient can emit devtools events. @@ -192,24 +206,31 @@ Step-by-step flow for a single tool requiring approval: TOOL_CALL_START { toolCallId: "tc-1", toolName: "send_email" } TOOL_CALL_ARGS { toolCallId: "tc-1", delta: '{"to":"..."}' } TOOL_CALL_END { toolCallId: "tc-1" } -CUSTOM { name: "approval-requested", value: { - toolCallId: "tc-1", - toolName: "send_email", - input: { to: "..." }, - approval: { id: "appr-1", needsApproval: true } +RUN_FINISHED { outcome: { + type: "interrupt", + interrupts: [{ + id: "appr-1", + reason: "approval_required", + metadata: { + kind: "approval", + toolCallId: "tc-1", + toolName: "send_email", + input: { to: "..." } + } + }] }} -RUN_FINISHED { finishReason: "tool_calls" } ``` -### 2. StreamProcessor processes the CUSTOM chunk +### 2. StreamProcessor processes the interrupt outcome ``` -handleCustomEvent(): +handleRunFinished(): 1. updateToolCallApproval(messages, messageId, "tc-1", "appr-1") → Sets part.state = "approval-requested" → Sets part.approval = { id: "appr-1", needsApproval: true } - 2. emitMessagesChange() - 3. fires onApprovalRequest({ toolCallId, toolName, input, approvalId }) + 2. records the pending interrupt on the client + 3. emitMessagesChange() + 4. fires onApprovalRequest({ toolCallId, toolName, input, approvalId }) ``` ### 3. Stream ends, ChatClient processes @@ -233,7 +254,9 @@ addToolApprovalResponse({ id: "appr-1", approved: true }): → updateToolCallApprovalResponse(): part.approval.approved = true part.state = "approval-responded" - 2. isLoading is false → call checkForContinuation() directly + 2. queue a resume entry: + { interruptId: "appr-1", status: "resolved", payload: { approved: true } } + 3. isLoading is false → call checkForContinuation() directly ``` ### 5. Continuation @@ -244,8 +267,9 @@ checkForContinuation(): 2. shouldAutoSend() → areAllToolsComplete(): part.state === "approval-responded" → true 3. continuationPending = true - 4. streamResponse() → new stream to server with approval in messages - 5. Server sees approval, executes tool, returns result + LLM response + 4. streamResponse() → new stream to server with `resume[]` + 5. Persistence validates `resume[]`, resolves the pending interrupt, then the + server executes or cancels the tool and returns result + LLM response 6. continuationPending = false ``` @@ -388,11 +412,52 @@ Timeline (with fix): ### Approval-related AG-UI events -These are `CUSTOM` events emitted by the TextEngine, not by adapters directly. +The primary wait event is `RUN_FINISHED` with an interrupt outcome: + +```typescript ignore +{ + type: 'RUN_FINISHED', + outcome: { + type: 'interrupt', + interrupts: [ + { + id: string, + reason: 'approval_required', + metadata: { + kind: 'approval', + toolCallId: string, + toolName: string, + input: unknown + } + } + ] + } +} +``` + +Resume uses AG-UI `RunAgentInput.resume[]` on the next request: + +```typescript ignore +{ + resume: [ + { + interruptId: 'appr-1', + status: 'resolved', + payload: { approved: true } + } + ] +} +``` + +Pending user-actionable interrupts block normal input on the same thread by +default. A stale cursor replay may read the public event tail, but a new +non-resume input must resolve the pending interrupts first. #### `approval-requested` -Emitted when a tool with `needsApproval: true` has its arguments finalized. +Legacy compatibility/projection event emitted when a tool with +`needsApproval: true` has its arguments finalized. Use this to keep older UI +rendering paths working; do not use it as the primary durable wait signal. ```typescript ignore { @@ -422,13 +487,15 @@ TOOL_CALL_START → creates tool-call part (state: awaiting-input) TOOL_CALL_ARGS* → accumulates arguments (state: input-streaming) TOOL_CALL_END → finalizes arguments (state: input-complete) CUSTOM → approval-requested (state: approval-requested) -RUN_FINISHED → finishReason: "tool_calls" +RUN_FINISHED → outcome.type: "interrupt" ``` After the stream ends and the user responds, the ChatClient: 1. Updates the tool-call part (state: `approval-responded`) -2. Sends a new stream request with the full conversation (including approval) -3. The server sees the approval and either executes or cancels the tool +2. Sends a new stream request with `RunAgentInput.resume[]` entries resolving + the pending interrupt +3. Persistence validates the resume entries, resolves the interrupt, and the + server executes or cancels the tool based on the resume payload --- diff --git a/docs/chat/connection-adapters.md b/docs/chat/connection-adapters.md index a67e17e6f..c5e638d0d 100644 --- a/docs/chat/connection-adapters.md +++ b/docs/chat/connection-adapters.md @@ -73,7 +73,7 @@ import { useChat, fetchServerSentEvents } from "@tanstack/ai-react"; const { messages } = useChat({ connection: fetchServerSentEvents("/api/chat", { - body: { provider: "openai", model: "gpt-5.1" }, + body: { provider: "openai", model: "gpt-5.5" }, }), }); ``` @@ -178,7 +178,7 @@ import { chatServerFn } from "./server/chat.server"; // `chatServerFn` is an in-process server-side function that synchronously // returns an AsyncIterable — e.g. the result of -// `chat({ adapter, model, messages })` on the server. +// `chat({ adapter: openaiText("gpt-5.5"), messages })` on the server. const { messages } = useChat({ connection: stream((messages, data) => chatServerFn({ messages, ...data })), }); @@ -197,13 +197,28 @@ When you call into your server with an **async** function — the universal case import { createServerFn } from "@tanstack/react-start"; import { chat, toServerSentEventsResponse } from "@tanstack/ai"; import { openaiText } from "@tanstack/ai-openai"; -import type { UIMessage } from "@tanstack/ai"; +import type { RunAgentResumeItem, UIMessage } from "@tanstack/ai"; + +type ChatFnInput = { + messages: Array; + threadId?: string; + runId?: string; + cursor?: string; + resume?: Array; +}; export const chatFn = createServerFn({ method: "POST" }) - .inputValidator((data: { messages: Array }) => data) + .inputValidator((data: ChatFnInput) => data) .handler(({ data }) => toServerSentEventsResponse( - chat({ adapter: openaiText("gpt-5.1"), messages: data.messages }), + chat({ + adapter: openaiText("gpt-5.5"), + messages: data.messages, + threadId: data.threadId, + runId: data.runId, + cursor: data.cursor, + resume: data.resume, + }), ), ); ``` @@ -213,11 +228,14 @@ import { useChat } from "@tanstack/ai-react"; import { chatFn } from "./server/chat.server"; const { messages, sendMessage } = useChat({ - fetcher: ({ messages }, { signal }) => chatFn({ data: { messages }, signal }), + fetcher: (input, { signal }) => chatFn({ data: input, signal }), }); ``` -The fetcher receives `{ messages, data, threadId, runId }` plus an `AbortSignal` (triggered by `stop()` or when a send is superseded). Return a `Response` — whose SSE body the chat client parses for you — **or** an `AsyncIterable`, which is yielded directly. If your server function returns the stream itself (instead of wrapping it in a `Response`), the fetcher handles that too. Sync and `Promise`-wrapped returns are both accepted. +The fetcher receives `{ messages, data, threadId, runId, cursor, resume }` plus an `AbortSignal` (triggered by `stop()` or when a send is superseded). Forward `cursor` when replaying persisted public events, and forward `resume` when responding to pending AG-UI interrupts. Return a `Response` — whose SSE body the chat client parses for you — **or** an `AsyncIterable`, which is yielded directly. If your server function returns the stream itself (instead of wrapping it in a `Response`), the fetcher handles that too. Sync and `Promise`-wrapped returns are both accepted. + +For the server persistence path that produces those replay cursors, see +[Resumable Chat](../persistence/resumable-chat). > **Tip:** The choice between `fetcher` and [`stream()`](#server-functions-and-direct-async-iterables) is about **async vs sync**, not `Response`-vs-iterable — both can yield an `AsyncIterable`. `stream()`'s factory must return that iterable **synchronously**, so a server-function call (which returns a `Promise`) won't typecheck there — that's the gap `fetcher` fills ([issue #509](https://github.com/TanStack/ai/issues/509)). Use `stream()` when you can hand back an async iterable synchronously (in-process `chat()`, an RPC client, tests); use `fetcher` for anything you have to `await`. Both normalize to the same request-scoped adapter, so `stop()`/abort, error handling, and tool calls behave identically. @@ -315,6 +333,8 @@ function websocketConnection(url: string): SubscribeConnectionAdapter { JSON.stringify({ threadId: runContext?.threadId, runId: runContext?.runId, + cursor: runContext?.cursor, + resume: runContext?.resume, messages, data, }), @@ -382,6 +402,8 @@ const myAdapter: ConnectConnectionAdapter = { body: JSON.stringify({ threadId: runContext?.threadId, runId: runContext?.runId, + cursor: runContext?.cursor, + resume: runContext?.resume, messages, ...data, }), @@ -415,7 +437,11 @@ const myAdapter: ConnectConnectionAdapter = { const { messages } = useChat({ connection: myAdapter }); ``` -`runContext` carries `threadId`, `runId`, `clientTools`, and `forwardedProps`. Include them in your request payload so the server can build an AG-UI-compliant response. If your `connect` stream completes without emitting `RUN_FINISHED`, the runtime synthesizes one for you; if it throws, a `RUN_ERROR` is synthesized. +`runContext` carries `threadId`, `runId`, `cursor`, `resume`, `clientTools`, and `forwardedProps`. Include them in your request payload so the server can build an AG-UI-compliant response, replay from `{ threadId, runId, cursor }`, and validate `RunAgentInput.resume[]` for pending interrupts. If your `connect` stream completes without emitting `RUN_FINISHED`, the runtime synthesizes one for you; if it throws, a `RUN_ERROR` is synthesized. + +If the request-scoped adapter talks to a persisted `chat()` endpoint, those +fields are the durable replay identity described in +[Resumable Chat](../persistence/resumable-chat). ## The Adapter Interface @@ -423,12 +449,14 @@ A `ConnectionAdapter` is a union — provide **either** `connect`, **or** both ` ```typescript import type { UIMessage } from "@tanstack/ai-client"; -import type { ModelMessage, StreamChunk } from "@tanstack/ai"; +import type { ModelMessage, RunAgentResumeItem, StreamChunk } from "@tanstack/ai"; export interface RunAgentInputContext { threadId: string; runId: string; parentRunId?: string; + cursor?: string; + resume?: Array; clientTools?: Array<{ name: string; description: string; parameters: unknown }>; forwardedProps?: Record; } diff --git a/docs/chat/persistence.md b/docs/chat/persistence.md index 985c8aa2d..a53f08be0 100644 --- a/docs/chat/persistence.md +++ b/docs/chat/persistence.md @@ -17,6 +17,11 @@ By default a `ChatClient` (and every framework `useChat`/`createChat` wrapper) k This is especially useful for SPAs, Electron apps, and offline-first setups where the client is the source of truth and there's no server managing conversation state. +If your server owns the transcript and you need durable event replay after a +reconnect or reload, use server-side persistence instead. See +[Resumable Chat](../persistence/resumable-chat) for the full `withPersistence` +server/client wiring. + ## The adapter interface A persistence adapter is any object with three methods — the same `getItem`/`setItem`/`removeItem` shape used elsewhere in TanStack AI. Each method may be synchronous or return a `Promise`: @@ -46,19 +51,21 @@ import { adapter, myPersistenceAdapter } from "./chat-setup"; const client = new ChatClient({ id: "conversation-123", connection: adapter, - persistence: myPersistenceAdapter, + persistence: { + client: myPersistenceAdapter, + }, }); ``` ## What the client does for you -When a `persistence` adapter is provided, `ChatClient`: +When a `persistence.client` adapter is provided, `ChatClient`: - **Hydrates on construction** — calls `getItem(id)`. If it returns an array, those messages populate the client (overriding `initialMessages`). Async adapters hydrate as soon as the promise resolves, unless you've already started a new conversation in the meantime. - **Saves on every change** — calls `setItem(id, messages)` whenever the message list changes (new user message, streamed assistant content, tool calls/results, approval responses). Writes are queued so they never overlap or land out of order. - **Clears on `clear()`** — calls `removeItem(id)` and discards any in-flight stream so a cleared conversation doesn't get repopulated by late chunks. -When `persistence` is omitted, nothing changes — the client behaves exactly as before. The option is fully backwards compatible. +When `persistence` is omitted, nothing changes — the client behaves exactly as before. Passing a message adapter directly as `persistence: adapter` remains supported for compatibility, but it is deprecated. Use `persistence: { client: adapter }` in new code. Persistence is **best-effort**: if an adapter method throws or rejects, the error is swallowed so storage problems never break the chat. Handle and surface errors inside your adapter if you need to react to them. @@ -74,7 +81,9 @@ import { myPersistenceAdapter } from "./persistence"; const chat = useChat({ id: "conversation-123", connection: fetchServerSentEvents("/api/chat"), - persistence: myPersistenceAdapter, + persistence: { + client: myPersistenceAdapter, + }, }); ``` @@ -86,7 +95,9 @@ import { myPersistenceAdapter } from "./persistence"; const chat = useChat({ id: "conversation-123", connection: fetchServerSentEvents("/api/chat"), - persistence: myPersistenceAdapter, + persistence: { + client: myPersistenceAdapter, + }, }); ``` @@ -95,7 +106,9 @@ const chat = useChat({ const chat = createChat({ id: "conversation-123", connection: fetchServerSentEvents("/api/chat"), - persistence: myPersistenceAdapter, + persistence: { + client: myPersistenceAdapter, + }, }); ``` diff --git a/docs/config.json b/docs/config.json index ac0c6b7fc..1eeefa0b0 100644 --- a/docs/config.json +++ b/docs/config.json @@ -103,7 +103,8 @@ { "label": "Tool Approval Flow", "to": "tools/tool-approval", - "addedAt": "2026-04-15" + "addedAt": "2026-04-15", + "updatedAt": "2026-07-03" }, { "label": "Lazy Tool Discovery", @@ -140,7 +141,7 @@ "label": "MCP Apps", "to": "mcp/apps", "addedAt": "2026-06-24", - "updatedAt": "2026-06-26" + "updatedAt": "2026-07-03" } ] }, @@ -160,7 +161,8 @@ { "label": "Connection Adapters", "to": "chat/connection-adapters", - "addedAt": "2026-04-15" + "addedAt": "2026-04-15", + "updatedAt": "2026-07-03" }, { "label": "Thinking & Reasoning", @@ -170,7 +172,8 @@ { "label": "Persistence", "to": "chat/persistence", - "addedAt": "2026-06-02" + "addedAt": "2026-06-02", + "updatedAt": "2026-07-03" } ] }, @@ -319,7 +322,7 @@ "label": "Overview", "to": "sandbox/overview", "addedAt": "2026-06-16", - "updatedAt": "2026-06-30" + "updatedAt": "2026-07-03" }, { "label": "Quick Start", @@ -361,7 +364,8 @@ { "label": "Lifecycle & Snapshots", "to": "sandbox/lifecycle", - "addedAt": "2026-06-29" + "addedAt": "2026-06-29", + "updatedAt": "2026-07-03" }, { "label": "Events", @@ -380,6 +384,47 @@ } ] }, + { + "label": "Persistence", + "children": [ + { + "label": "Overview", + "to": "persistence/overview", + "addedAt": "2026-06-18", + "updatedAt": "2026-07-03" + }, + { + "label": "Resumable Chat", + "to": "persistence/resumable-chat", + "addedAt": "2026-07-03" + }, + { + "label": "Interrupts and Approvals", + "to": "persistence/interrupts-and-approvals", + "addedAt": "2026-07-03" + }, + { + "label": "SQL Backends", + "to": "persistence/sql-backends", + "addedAt": "2026-07-03" + }, + { + "label": "Cloudflare", + "to": "persistence/cloudflare", + "addedAt": "2026-07-03" + }, + { + "label": "Custom Stores", + "to": "persistence/custom-stores", + "addedAt": "2026-07-03" + }, + { + "label": "Sandbox Runs", + "to": "persistence/sandbox-runs", + "addedAt": "2026-07-03" + } + ] + }, { "label": "Advanced", "children": [ diff --git a/docs/mcp/apps.md b/docs/mcp/apps.md index 80ea623b0..46496cc2b 100644 --- a/docs/mcp/apps.md +++ b/docs/mcp/apps.md @@ -365,6 +365,10 @@ const handler = createMcpAppCallHandler({ clients: mcp, store }) > **Current limitation:** `inMemoryMcpSessionStore` is single-instance (one Node.js process). It does not survive serverless restarts or scale across replicas. The `McpSessionStore` interface is the persistence extension point — persistent backends (database, KV store) can be dropped in without any API changes. +For broader app-owned persistence primitives such as metadata, public replay +events, internal checkpoints, and custom SQL/KV stores, see +[Custom Stores](../persistence/custom-stores). + ## API Reference ### `createMcpAppCallHandler` (`@tanstack/ai-mcp/apps`) diff --git a/docs/media/transcription.md b/docs/media/transcription.md index 61f8b4d40..d7821ab27 100644 --- a/docs/media/transcription.md +++ b/docs/media/transcription.md @@ -2,7 +2,7 @@ title: Transcription id: transcription order: 4 -description: "Transcribe audio to text with OpenAI Whisper and GPT-4o-transcribe via TanStack AI's generateTranscription() API." +description: "Transcribe audio to text with OpenAI Whisper and current transcription models via TanStack AI's generateTranscription() API." keywords: - tanstack ai - transcription @@ -15,14 +15,14 @@ keywords: # Audio Transcription -TanStack AI provides support for audio transcription (speech-to-text) through dedicated transcription adapters. This guide covers how to convert spoken audio into text using OpenAI's Whisper and GPT-4o transcription models. +TanStack AI provides support for audio transcription (speech-to-text) through dedicated transcription adapters. This guide covers how to convert spoken audio into text using OpenAI's Whisper and current transcription models. ## Overview Audio transcription is handled by transcription adapters that follow the same tree-shakeable architecture as other adapters in TanStack AI. Currently supported: -- **OpenAI**: Whisper-1, GPT-4o-transcribe, GPT-4o-mini-transcribe +- **OpenAI**: Whisper-1 and current OpenAI transcription models - **fal.ai**: Whisper, Wizper, speech-to-text turbo, ElevenLabs speech-to-text ## Basic Usage @@ -247,9 +247,7 @@ await transcribeAudio('./meeting-recording.mp3') | Model | Description | Use Case | |-------|-------------|----------| | `whisper-1` | Whisper large-v2 | General transcription | -| `gpt-4o-transcribe` | GPT-4o-based transcription | Higher accuracy | -| `gpt-4o-transcribe-diarize` | With speaker diarization | Multi-speaker audio | -| `gpt-4o-mini-transcribe` | Faster, lighter model | Cost-effective | +| Current OpenAI transcription models | Latest dedicated speech-to-text models | Higher accuracy and model-specific capabilities | ### Supported Audio Formats @@ -559,4 +557,3 @@ const adapter = createOpenaiTranscription('whisper-1', 'your-openai-api-key') 5. **Prompting**: Use the `prompt` option to provide context or expected vocabulary (e.g., technical terms, names). 6. **Timestamps**: Request `verbose_json` format and enable `timestamp_granularities: ['word', 'segment']` when you need timing information for captions or synchronization. - diff --git a/docs/persistence/cloudflare.md b/docs/persistence/cloudflare.md new file mode 100644 index 000000000..11c873729 --- /dev/null +++ b/docs/persistence/cloudflare.md @@ -0,0 +1,105 @@ +--- +title: Cloudflare Persistence +id: cloudflare +--- + +Use the Cloudflare backend when `chat()` runs in Workers and persistence should +stay on Cloudflare primitives. D1 stores the core SQL state, R2 can hold blobs +and artifact bytes, and Durable Objects can provide cross-isolate locks. + +## Bind D1, R2, and Durable Objects + +```ts +import { cloudflarePersistence } from '@tanstack/ai-persistence-cloudflare' + +interface Env { + AI_D1: D1Database + AI_BLOBS: R2Bucket + AI_LOCKS: DurableObjectNamespace +} + +export function persistence(env: Env) { + return cloudflarePersistence({ + d1: env.AI_D1, + r2: env.AI_BLOBS, + durableObjects: env.AI_LOCKS, + r2ArtifactPrefix: 'tanstack-ai/artifacts/', + r2BlobPrefix: 'tanstack-ai/blobs/', + }) +} +``` + +`d1` is required. `r2` and `durableObjects` are optional; include them only when +your app needs blob/artifact storage or distributed locks. + +## Core state lives in D1 + +D1 backs the shared SQL stores: + +- runs, +- public replay events, +- internal events, +- messages, +- interrupts, +- metadata, +- migration bookkeeping. + +That means reconnect and resume behavior does not depend on R2. R2 only stores +byte payloads for the optional blob and artifact stores. + +## Store blobs and artifacts in R2 + +When you pass `r2`, the backend adds: + +- `stores.blobs` backed by R2 objects, +- `stores.artifacts` with D1 metadata/index rows and R2-backed bytes. + +Artifact `list(runId)` reads the D1 index without downloading byte bodies. +Artifact `get(artifactId)` hydrates bytes from R2 when the artifact record +points at a blob. Optional artifact cleanup deletes both the D1 row and the R2 +object. + +## Use Durable Objects for locks + +Pass a Durable Object namespace when sandbox resume, workflow coordination, or +another feature needs cross-isolate mutual exclusion. + +```ts +import { + LockDurableObject, + cloudflarePersistence, +} from '@tanstack/ai-persistence-cloudflare' + +export { LockDurableObject } + +export function persistence(env: Env) { + return cloudflarePersistence({ + d1: env.AI_D1, + durableObjects: env.AI_LOCKS, + durableObjectLocks: { + leaseMs: 30_000, + pollMs: 50, + }, + }) +} +``` + +Bind the exported `LockDurableObject` class in your Worker config and pass that +namespace as `durableObjects`. + +## Self-managed schema + +Cloudflare migrations are lazy by default. If you deploy schema separately, set +`migrate: false` and apply both the shared SQL DDL and the Cloudflare artifact +index DDL. + +```ts +import { cloudflareArtifactDdl } from '@tanstack/ai-persistence-cloudflare' +import { ddl } from '@tanstack/ai-persistence-sql' + +export const statements = [...ddl('sqlite'), ...cloudflareArtifactDdl()] +``` + +D1 is SQLite-compatible, so use the SQLite dialect for the shared core DDL. The +artifact DDL creates the Cloudflare-specific artifact index table used when R2 +is attached. diff --git a/docs/persistence/custom-stores.md b/docs/persistence/custom-stores.md new file mode 100644 index 000000000..e858f95aa --- /dev/null +++ b/docs/persistence/custom-stores.md @@ -0,0 +1,126 @@ +--- +title: Custom Stores +id: custom-stores +--- + +Use custom stores when the built-in backends do not match your infrastructure, +or when an integration needs to add durable metadata, locks, artifacts, blobs, +or internal checkpoints without changing the public chat replay stream. + +This page is reference material for adapter authors. If you only need to choose +a packaged backend, start with [SQL Backends](./sql-backends) or +[Cloudflare](./cloudflare). If you need the end-to-end chat journey, start with +[Resumable Chat](./resumable-chat). + +## Define an `AIPersistence` + +`AIPersistence` is an aggregate of optional stores. Implement only the stores +your scenario needs, then validate features with `withPersistence(...)`. During +development, `memoryPersistence()` is a useful complete baseline: replace one +store at a time with your backend implementation while keeping the feature +validation behavior realistic. + +```ts +import { + defineAIPersistence, + memoryPersistence, + withPersistence, +} from '@tanstack/ai-persistence' +import type { MetadataStore } from '@tanstack/ai-persistence' + +class AcmeMetadataStore implements MetadataStore { + private values = new Map() + + async get(scope: string, key: string) { + return this.values.get(`${scope}:${key}`) ?? null + } + + async set(scope: string, key: string, value: unknown) { + this.values.set(`${scope}:${key}`, value) + } + + async delete(scope: string, key: string) { + this.values.delete(`${scope}:${key}`) + } +} + +const baseline = memoryPersistence() + +const persistence = defineAIPersistence({ + stores: { + ...baseline.stores, + metadata: new AcmeMetadataStore(), + }, +}) + +const middleware = withPersistence(persistence, { + features: ['messages', 'durable-replay', 'interrupts', 'metadata'], +}) +``` + +If any required store is missing, setup fails before the run starts. + +| Feature | Required stores | +| --- | --- | +| `messages` | `stores.messages` | +| `durable-replay` | `stores.runs`, `stores.publicEvents` | +| `interrupts` | `stores.runs`, `stores.publicEvents`, `stores.interrupts` | +| `internal-events` | `stores.internalEvents` | +| `metadata` | `stores.metadata` | +| `locks` | `stores.locks` | +| `artifacts` | `stores.artifacts` | +| `blobs` | `stores.blobs` | + +## Keep public and internal events separate + +`PublicEventStore` is the user-visible AG-UI stream. It is what reconnecting +clients replay after an opaque cursor. Store exactly the public `StreamChunk` +events there. + +`InternalEventStore` is for package-owned or app-owned checkpoints: +compare-and-swap coordination, workflow checkpoints, adapter internals, or +other state that must not be replayed to the UI. Keep those events namespaced +and separate from the public stream. + +## Store app metadata + +Use `MetadataStore` for durable key/value state associated with a thread, run, +or integration. MCP session correlation is a good example: the base persistence +schema records public stream replay, while app-owned metadata can map a thread +or run to an MCP session id. + +```ts +await persistence.stores.metadata?.set( + 'thread:weather-chat', + 'mcp-session', + { serverId: 'weather', sessionId: 'session-123' }, +) +``` + +Use a stable scope convention such as `thread:` or `run:` so +multiple integrations do not collide. + +## Add locks, artifacts, and blobs when needed + +`stores.locks` provides a shared `LockStore` capability. Sandboxes and workflow +extensions use it to prevent two processes from resuming or mutating the same +durable resource at the same time. + +`stores.artifacts` stores generated artifacts by `artifactId`, `runId`, and +`threadId`. `stores.blobs` stores raw bytes and can be shared by artifacts or +other integrations. A backend may store artifact metadata in SQL and bytes in +object storage, as the [Cloudflare backend](./cloudflare) does with D1 and R2. + +## Extend without growing the base schema + +MCP and workflow packages should build on the common stores instead of adding +new base persistence tables for every feature: + +- use public events for UI replay, +- use internal events for checkpoints, +- use metadata for app-owned correlation, +- use locks for cross-process coordination, +- use artifacts and blobs for durable outputs. + +That keeps resumable chat small for apps that only need messages and replay, +while still giving advanced integrations durable primitives. diff --git a/docs/persistence/interrupts-and-approvals.md b/docs/persistence/interrupts-and-approvals.md new file mode 100644 index 000000000..b6329e66c --- /dev/null +++ b/docs/persistence/interrupts-and-approvals.md @@ -0,0 +1,116 @@ +--- +title: Interrupts and Approvals +id: interrupts-and-approvals +--- + +Use interrupt persistence when a run can pause for a human decision and resume +later without losing the stream, rerunning model work, or accepting new input +that forks around the pending choice. + +## How durable interrupts work + +With the `interrupts` feature, persistence stores: + +- the run record in `stores.runs`, +- the public AG-UI event log in `stores.publicEvents`, +- pending waits in `stores.interrupts`. + +```ts +import { chat, toServerSentEventsResponse } from '@tanstack/ai' +import { anthropicText } from '@tanstack/ai-anthropic' +import { withPersistence } from '@tanstack/ai-persistence' +import { sqlitePersistence } from '@tanstack/ai-persistence-sqlite' +import { sendEmail } from './tools' + +const persistence = sqlitePersistence({ + path: '.tanstack-ai/state.sqlite', +}) + +export async function POST(request: Request) { + const { messages, threadId, runId, cursor, resume } = await request.json() + + const stream = chat({ + threadId, + runId, + cursor, + resume, + adapter: anthropicText('claude-sonnet-4-6'), + messages, + tools: [sendEmail], + middleware: [ + withPersistence(persistence, { + features: ['interrupts'], + }), + ], + }) + + return toServerSentEventsResponse(stream) +} +``` + +Feature validation is fail-loud. If a custom backend omits one of the required +stores, the middleware throws during setup instead of silently dropping pending +waits. + +## Resume with AG-UI `resume[]` + +A user-actionable wait is represented in the public stream by +`RUN_FINISHED.outcome.type === 'interrupt'`. The client should collect the +pending interrupt ids, ask the user for a decision, then resume the same run +with AG-UI `RunAgentInput.resume[]` entries. + +```ts +await chat.resumeInterrupts([ + { + interruptId: 'send-email-approval', + status: 'resolved', + payload: { approved: true }, + }, +]) +``` + +Normal new input on the same thread is rejected by default while pending +interrupts exist. That keeps the server from accidentally creating a second +conversation branch before the existing decision has been resolved or +cancelled. + +## Approval compatibility + +Tool approvals are the common UI shape for interrupts. A tool with +`needsApproval: true` can pause the run, surface an approval request, and resume +after the user approves or denies it. + +```ts +import { toolDefinition } from '@tanstack/ai' +import { z } from 'zod' + +const sendEmailDef = toolDefinition({ + name: 'send_email', + description: 'Send an email to a recipient', + inputSchema: z.object({ + to: z.string().email(), + subject: z.string(), + body: z.string(), + }), + outputSchema: z.object({ + success: z.boolean(), + messageId: z.string(), + }), + needsApproval: true, +}) + +export const sendEmail = sendEmailDef.server(async ({ to, subject, body }) => { + const response = await fetch('https://email.example.com/send', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ to, subject, body }), + }) + return { success: response.ok, messageId: crypto.randomUUID() } +}) +``` + +Older approval-specific event projections remain compatibility behavior. New +durable flows should treat the interrupt outcome and `resume[]` payload as the +source of truth, then render approval UI as one way to collect the user's +response. For basic approval rendering without server persistence, see +[Tool Approval Flow](../tools/tool-approval). diff --git a/docs/persistence/overview.md b/docs/persistence/overview.md new file mode 100644 index 000000000..5b42f94b5 --- /dev/null +++ b/docs/persistence/overview.md @@ -0,0 +1,92 @@ +--- +title: Persistence Overview +id: overview +--- + +Persistence makes server-side `chat()` runs durable and resumable. Add +`withPersistence(...)` when the server should own the thread transcript, replay +streamed AG-UI events after a reconnect, resume pending user decisions, or share +storage primitives with sandbox and workflow extensions. + +Persistence is optional middleware. A run with no persistence middleware behaves +exactly as before. + +## What persistence stores + +`withPersistence(...)` can: + +- load and save a thread's server-authoritative message history, +- create and update run records with status, usage, and errors, +- append every streamed public AG-UI event with an opaque cursor, +- replay persisted events after `{ threadId, runId, cursor }`, +- persist pending user-actionable interrupts, +- validate optional stores before a feature silently loses durability, +- provide locks, metadata, artifacts, and blobs to integrations that need them. + +The primary extension point is `AIPersistence`, usually created with +`defineAIPersistence(...)` or a backend factory such as +`sqlitePersistence(...)`. The older `ChatPersistence` and +`defineChatPersistence` names remain deprecated compatibility aliases. + +## Pick the page for your scenario + +If you are building a persisted chat UI, start with +[Resumable Chat](./resumable-chat). It shows both sides: a server endpoint using +`withPersistence` and a client that forwards `threadId`, `runId`, `cursor`, and +`resume` so reconnects continue the same run. Use the headless +`persistence.server` option to store the client's latest resume snapshot for +full page reload recovery. + +If your run can pause for a human decision, read +[Interrupts and Approvals](./interrupts-and-approvals). It covers durable +pending waits, AG-UI `resume[]` payloads, and how legacy approval UI maps onto +the newer interrupt path. + +If you are choosing a Node database backend, read +[SQL Backends](./sql-backends). SQLite, Postgres, Drizzle, and Prisma share the +same persistence model but differ in migration ownership. The shared SQL core +also has a MySQL dialect for adapter authors, but there is not a standalone +MySQL backend package. + +If you deploy on Workers, read [Cloudflare](./cloudflare). D1 stores the core +run state, R2 can store blobs and artifact bytes, and Durable Objects can +provide cross-isolate locks. + +If you own the storage layer or are extending persistence for MCP/workflows, +read [Custom Stores](./custom-stores). It explains the optional `AIPersistence` +stores, public versus internal events, metadata, locks, artifacts, and blobs. + +If you persist coding-agent sandboxes, read +[Sandbox Runs](./sandbox-runs). It shows how +`@tanstack/ai-sandbox-persistence` bridges durable sandbox records and locks +between `withPersistence` and `withSandbox`. + +## Installation + +Install the core package and the backend package that matches your deployment: + +```sh +pnpm add @tanstack/ai-persistence @tanstack/ai-persistence-sqlite +``` + +For prototypes and tests, `memoryPersistence()` ships in +`@tanstack/ai-persistence`. Durable apps usually use one of the backend +packages: `@tanstack/ai-persistence-sqlite`, +`@tanstack/ai-persistence-postgres`, `@tanstack/ai-persistence-cloudflare`, +`@tanstack/ai-persistence-drizzle`, or `@tanstack/ai-persistence-prisma`. + +## Durable replay state + +The replay identity is: + +```ts +type ResumeState = { + threadId: string + runId: string + cursor: string +} +``` + +Treat `cursor` as opaque. Store and forward it, but do not parse it. When a +request includes a cursor, `chat()` replays the persisted event tail after that +cursor instead of calling the model again. diff --git a/docs/persistence/resumable-chat.md b/docs/persistence/resumable-chat.md new file mode 100644 index 000000000..b79e5748f --- /dev/null +++ b/docs/persistence/resumable-chat.md @@ -0,0 +1,152 @@ +--- +title: Resumable Chat +id: resumable-chat +--- + +Use server persistence when the server should be authoritative for a chat +thread. The client may still keep local UI state, but the durable transcript, +run status, and replayable event log live behind `withPersistence(...)`. + +By the end, your endpoint accepts `{ threadId, runId, cursor, resume }`, writes +each streamed chunk to durable storage, and lets the client resume after an +in-session disconnect or full page reload. + +## Install a backend + +SQLite is the simplest durable backend for a Node server: + +```sh +pnpm add @tanstack/ai-persistence @tanstack/ai-persistence-sqlite +``` + +## Create the server endpoint + +Build the persistence instance once and reuse it across requests. + +```ts +import { chat, toServerSentEventsResponse } from '@tanstack/ai' +import { anthropicText } from '@tanstack/ai-anthropic' +import { withPersistence } from '@tanstack/ai-persistence' +import { sqlitePersistence } from '@tanstack/ai-persistence-sqlite' + +const persistence = sqlitePersistence({ + path: '.tanstack-ai/state.sqlite', +}) + +export async function POST(request: Request) { + const { messages, threadId, runId, cursor, resume } = await request.json() + + const stream = chat({ + threadId, + runId, + cursor, + resume, + adapter: anthropicText('claude-sonnet-4-6'), + messages, + middleware: [withPersistence(persistence)], + }) + + return toServerSentEventsResponse(stream) +} +``` + +`withPersistence` loads the stored thread history, saves the resulting +transcript, records run status, and appends every public AG-UI event with a +cursor. When `cursor` is present, the run replays persisted events after that +cursor instead of re-running the adapter. + +## Wire the client + +The chat client forwards the resume fields through its connection adapter. Keep +a stable `threadId` per conversation so a reload returns to the same server +thread. Use `persistence.server` to store the client's latest resume snapshot +under that `threadId` so reloads can continue the same durable run. + +```tsx +import { fetchServerSentEvents, useChat } from '@tanstack/ai-react' +import { localStorageChatPersistence } from '@tanstack/ai-client' + +const threadId = 'thread-123' + +export function Chat() { + const chat = useChat({ + id: threadId, + threadId, + connection: fetchServerSentEvents('/api/chat'), + persistence: { + client: localStorageChatPersistence({ + keyPrefix: 'tanstack-ai:messages:', + }), + server: localStorageChatPersistence({ + keyPrefix: 'tanstack-ai:resume:', + }), + }, + }) + + return ( +
{ + event.preventDefault() + const form = event.currentTarget + const input = new FormData(form).get('message') + if (typeof input === 'string' && input.trim()) { + chat.sendMessage({ content: input }) + form.reset() + } + }} + > + {chat.messages.map((message) => ( +

+ {message.parts.map((part, index) => + part.type === 'text' ? {part.content} : null, + )} +

+ ))} + + +
+ ) +} +``` + +Auto-resume is enabled by default. On mount, reconnect, or when the tab comes +back online, the client can continue an interrupted run by forwarding the last +known `{ threadId, runId, cursor }`. Opt out with `autoResume: false`, or call +`chat.resume()` when you want a manual retry button. + +`chat.resumeState` contains the active resume identity, or `null` when there is +nothing to continue. `chat.pendingInterrupts` contains the client-side +descriptors needed to answer pending user decisions. `persistence.server` +stores them together and hydrates them on the next client construction. The +server remains authoritative: the stored snapshot only tells the client which +durable run to reconnect to and which pending interrupts it can answer. + +## Client message storage is separate + +Client-side chat persistence stores rendered `UIMessage` history in +`localStorage`, IndexedDB, or another browser-side adapter. Server persistence +stores model messages, run records, and replayable public events. You can use +both with `persistence: { client, server }`, but they solve different problems. +See [Chat Persistence](../chat/persistence) for client-only message storage. + +## Resume pending decisions + +If the server finishes with `RUN_FINISHED.outcome.type === 'interrupt'`, the +thread has a pending user-actionable wait. Resolve those waits with +`chat.resumeInterrupts(...)`; the client forwards them as AG-UI +`RunAgentInput.resume[]` entries on the next request. If the page reloads +before the user answers, `persistence.server` restores the pending interrupts +the client needs to render and resume. + +```ts +await chat.resumeInterrupts([ + { + interruptId: 'interrupt-1', + status: 'resolved', + payload: { approved: true }, + }, +]) +``` + +For approval-specific UI and compatibility details, see +[Interrupts and Approvals](./interrupts-and-approvals). diff --git a/docs/persistence/sandbox-runs.md b/docs/persistence/sandbox-runs.md new file mode 100644 index 000000000..3af22ba35 --- /dev/null +++ b/docs/persistence/sandbox-runs.md @@ -0,0 +1,102 @@ +--- +title: Sandbox Runs +id: sandbox-runs +--- + +Use sandbox persistence when a coding-agent harness needs to resume the same +sandbox across processes, not just within one Node.js process. The bridge +package connects core `AIPersistence` stores to the sandbox layer without making +either package depend directly on the other. + +## Install the bridge + +```sh +pnpm add @tanstack/ai-sandbox-persistence +``` + +You also need a SQL driver from the persistence backend you already use for +`withPersistence(...)`. + +## Create a durable sandbox store + +```ts +import { createSqliteDriver } from '@tanstack/ai-persistence-sqlite' +import { createSqlSandboxStore } from '@tanstack/ai-sandbox-persistence' + +const driver = createSqliteDriver({ path: '.tanstack-ai/state.sqlite' }) +const sandboxStore = createSqlSandboxStore(driver) +``` + +`createSqlSandboxStore(...)` creates the `sandbox_instances` table lazily. It +stores the sandbox instance key, provider id, provider sandbox id, latest +snapshot id, thread id, latest run id, and update time. + +## Bridge persistence into `withSandbox` + +Place `withPersistenceBridge(...)` after `withPersistence(...)` and before +`withSandbox(...)`. + +```ts +import { chat } from '@tanstack/ai' +import { claudeCodeText } from '@tanstack/ai-claude-code' +import { withPersistence } from '@tanstack/ai-persistence' +import { sqlitePersistence, createSqliteDriver } from '@tanstack/ai-persistence-sqlite' +import { defineSandbox, withSandbox } from '@tanstack/ai-sandbox' +import { dockerSandbox } from '@tanstack/ai-sandbox-docker' +import { + createSqlSandboxStore, + withPersistenceBridge, +} from '@tanstack/ai-sandbox-persistence' +import type { ModelMessage } from '@tanstack/ai' + +const dbPath = '.tanstack-ai/state.sqlite' +const persistence = sqlitePersistence({ path: dbPath }) +const sandboxStore = createSqlSandboxStore(createSqliteDriver({ path: dbPath })) + +const repoSandbox = defineSandbox({ + id: 'repo-agent', + provider: dockerSandbox({ image: 'node:22' }), +}) + +const messages: Array = [ + { role: 'user', content: 'Resume this repository task.' }, +] + +chat({ + threadId: 'thread-123', + runId: 'run-123', + adapter: claudeCodeText('claude-sonnet-4-6'), + messages, + middleware: [ + withPersistence(persistence), + withPersistenceBridge({ + persistence, + sandboxStore, + }), + withSandbox(repoSandbox), + ], +}) +``` + +The bridge provides the durable sandbox store and, when present, the persistence +lock store. The sandbox layer uses those capabilities to resume an existing +provider sandbox and to serialize ensure/resume work across processes. + +## Locks + +If the persistence backend exposes `stores.locks`, the bridge provides it to the +sandbox layer. On Cloudflare, that usually means using Durable Object locks. On +Node, use a backend-specific lock store when your deployment has more than one +process that can resume the same sandbox key. + +Without a durable lock, two workers may try to ensure the same sandbox at the +same time. Without a durable sandbox store, resume only works inside the process +that still has the in-memory record. + +## Harness resume + +Persistence replay and sandbox resume are separate pieces that work together. +`withPersistence` replays the public event tail for the chat client. The +sandbox store lets `withSandbox` find the same provider sandbox. A harness +adapter that supports reattach can then reconnect to the still-running agent +process and continue live after replay. diff --git a/docs/persistence/sql-backends.md b/docs/persistence/sql-backends.md new file mode 100644 index 000000000..95bcfeee5 --- /dev/null +++ b/docs/persistence/sql-backends.md @@ -0,0 +1,128 @@ +--- +title: SQL Backends +id: sql-backends +--- + +Use a SQL backend when your app runs in Node or another server runtime with a +database connection. SQLite, Postgres, and higher-level Drizzle or Prisma +clients all expose the same `AIPersistence` stores to `withPersistence(...)`. +The shared SQL core includes a MySQL dialect for adapter authors, but the +published raw backend packages are SQLite and Postgres. + +## Raw SQL backends + +SQLite is file-backed and works well for local apps, prototypes, and single-node +deployments. + +```ts +import { sqlitePersistence } from '@tanstack/ai-persistence-sqlite' + +export const persistence = sqlitePersistence({ + path: '.tanstack-ai/state.sqlite', +}) +``` + +Postgres is the usual durable choice for replicated server deployments. + +```ts +import { postgresPersistence } from '@tanstack/ai-persistence-postgres' + +export const persistence = postgresPersistence({ + connectionString: process.env.DATABASE_URL ?? '', +}) +``` + +Both raw backends use the shared SQL core. They create the persistence tables +lazily on first use by default. + +## Shared schema + +The base SQL schema is intentionally small: + +- `runs` +- `public_events` +- `internal_events` +- `messages` +- `interrupts` +- `metadata` +- `_tanstack_ai_migrations` + +Public events are the replayable AG-UI stream. Internal events are separate +package or app checkpoints and are not returned to reconnecting chat clients. + +## Own migrations with `migrate: false` + +If your deployment applies migrations separately, disable lazy migrations and +run the exported DDL yourself. + +```ts +import { ddl } from '@tanstack/ai-persistence-sql' +import { postgresPersistence } from '@tanstack/ai-persistence-postgres' + +export const persistence = postgresPersistence({ + connectionString: process.env.DATABASE_URL ?? '', + migrate: false, +}) + +export const migrationStatements = ddl('postgres') +``` + +Apply those statements with your normal migration system before traffic reaches +the app. + +## Drizzle and Prisma + +Use the ORM-backed packages when your app already owns its database client and +schema lifecycle. + +```ts +import { drizzlePersistence } from '@tanstack/ai-persistence-drizzle' +import { prismaPersistence } from '@tanstack/ai-persistence-prisma' +import type { DrizzleDb } from '@tanstack/ai-persistence-drizzle' +import type { PrismaRawClient } from '@tanstack/ai-persistence-prisma' + +declare const db: DrizzleDb +declare const prisma: PrismaRawClient + +export const drizzle = drizzlePersistence({ db, dialect: 'postgres' }) +export const prismaStore = prismaPersistence({ prisma, dialect: 'postgres' }) +``` + +Drizzle and Prisma users usually manage schema changes through their own +migration workflow. Pass `migrate: false` when you want that workflow to be the +only source of schema changes. Prisma can use the shared SQL dialects exposed +by `@tanstack/ai-persistence-sql`; Drizzle's built-in unwrapping currently +targets SQLite-shaped and node-postgres clients. + +## MySQL-compatible deployments + +There is no `@tanstack/ai-persistence-mysql` package today. If your app needs a +MySQL-compatible database, use the shared SQL core from +`@tanstack/ai-persistence-sql` through an adapter that provides a `SqlDriver` +with `dialect: 'mysql'`, or use `@tanstack/ai-persistence-prisma` with +`dialect: 'mysql'` and your own Prisma migration workflow. + +```ts +import { ddl } from '@tanstack/ai-persistence-sql' +import { prismaPersistence } from '@tanstack/ai-persistence-prisma' +import type { PrismaRawClient } from '@tanstack/ai-persistence-prisma' + +declare const prisma: PrismaRawClient + +export const persistence = prismaPersistence({ + prisma, + dialect: 'mysql', + migrate: false, +}) + +export const migrationStatements = ddl('mysql') +``` + +## Choosing a backend + +Use SQLite when one process owns the database file. Use Postgres when multiple +app instances need the same durable run state. Use Drizzle or Prisma when those +clients are already how your application accesses its database. Use the shared +SQL core or Prisma path when you need a MySQL-compatible deployment. Use +[Cloudflare](./cloudflare) instead of the Node SQL backends when your runtime is +Workers and your database is D1. diff --git a/docs/sandbox/lifecycle.md b/docs/sandbox/lifecycle.md index 4ccde4afd..8a1f9b8ae 100644 --- a/docs/sandbox/lifecycle.md +++ b/docs/sandbox/lifecycle.md @@ -115,3 +115,7 @@ Providers without durable disk or snapshots (e.g. ephemeral containers) re-create and re-bootstrap under the same identity: the `sandboxInstanceKey` stays stable, but every run pays the bootstrap cost because there is nothing durable to resume. + +To make sandbox resume records and ensure-locking durable across processes, wire +`@tanstack/ai-sandbox-persistence` between `withPersistence` and `withSandbox`. +See [Sandbox Runs](../persistence/sandbox-runs). diff --git a/docs/sandbox/overview.md b/docs/sandbox/overview.md index 52adcf8ba..981a69a24 100644 --- a/docs/sandbox/overview.md +++ b/docs/sandbox/overview.md @@ -116,6 +116,8 @@ sandbox on your laptop. Then dive into the piece you need: - **[Lifecycle & Snapshots](./lifecycle)** — reuse, snapshot-after-setup, and resume. - **[Events & File Hooks](./events)** — stream the agent's edits and activity to a UI. - **[Cloudflare (edge)](./cloudflare)** — run the agent and a live preview at the edge. +- **[Sandbox Runs](../persistence/sandbox-runs)** — persist sandbox run records, + resume bookkeeping, and locks across processes. ## Try it @@ -133,7 +135,8 @@ hands back a live preview URL, see `examples/sandbox-web` — one app with harne (Claude Code / Codex / OpenCode / Grok) and provider (Docker / local / Vercel / Daytona) pickers. -> **Persistence-ready:** the sandbox layer ships with in-memory stores for -> resume bookkeeping. A future persistence package can provide durable -> `SandboxStore` / `LockStore` implementations (and event-log replay) by -> supplying those optional capabilities — no changes to the sandbox layer. +> **Durable sandbox runs:** the sandbox layer ships with in-memory stores for +> local resume bookkeeping. Use +> [Sandbox Runs](../persistence/sandbox-runs) when resume records, locks, and +> event-log replay need to survive process restarts or run across multiple +> workers. diff --git a/docs/tools/tool-approval.md b/docs/tools/tool-approval.md index 1d87a3ff8..08c0a6ee5 100644 --- a/docs/tools/tool-approval.md +++ b/docs/tools/tool-approval.md @@ -23,6 +23,11 @@ The tool approval flow allows you to require user approval before executing sens After `approval-responded` the call executes (if approved). Although `complete` exists in the `ToolCallState` union, the runtime never transitions the tool-call part to it — the result surfaces as a populated `part.output` plus a sibling `tool-result` part whose own state is `complete` or `error`. +For approval flows that must survive disconnects, reloads, or process restarts, +pair approvals with server persistence. See +[Interrupts and Approvals](../persistence/interrupts-and-approvals) for the +durable `withPersistence` path. + When a tool requires approval, the typical flow is: 1. Model calls the tool diff --git a/examples/ts-react-chat/.env.example b/examples/ts-react-chat/.env.example index 90fab331c..a230d9199 100644 --- a/examples/ts-react-chat/.env.example +++ b/examples/ts-react-chat/.env.example @@ -2,6 +2,15 @@ # Get yours at: https://platform.openai.com/api-keys OPENAI_API_KEY= +# MySQL persistence demo (/mysql-persistence) +# Use MYSQL_URL or the individual MYSQL_* settings below. +# MYSQL_URL=mysql://root:password@127.0.0.1:3306/tanstack_ai_chat +MYSQL_HOST=127.0.0.1 +MYSQL_PORT=3306 +MYSQL_USER=root +MYSQL_PASSWORD= +MYSQL_DATABASE=tanstack_ai_chat + # Anthropic API Key (chat) # Get yours at: https://console.anthropic.com/settings/keys ANTHROPIC_API_KEY= diff --git a/examples/ts-react-chat/README.md b/examples/ts-react-chat/README.md index 1a07c1533..17a852ed0 100644 --- a/examples/ts-react-chat/README.md +++ b/examples/ts-react-chat/README.md @@ -37,8 +37,26 @@ An example chat application built with TanStack Start, TanStack Store, and **Tan ```env OPENAI_API_KEY=your_openai_api_key +MYSQL_URL=mysql://root:password@127.0.0.1:3306/tanstack_ai_chat ``` +## MySQL Persistence Demo + +The `/mysql-persistence` route demonstrates durable chat resume with MySQL. +Create a database, set either `MYSQL_URL` or the individual `MYSQL_HOST`, +`MYSQL_PORT`, `MYSQL_USER`, `MYSQL_PASSWORD`, and `MYSQL_DATABASE` variables, +then run the app and open `/mysql-persistence`. + +```sql +CREATE DATABASE tanstack_ai_chat; +``` + +The example stores messages, durable replay events, and interrupts through +`@tanstack/ai-persistence-sql`. Refresh while a response is streaming to +continue tailing the in-progress run from MySQL within the same dev/server +process. Reattaching to a producer after restarting the server process is out +of scope for this example. + ## Trying Out Lazy Tool Discovery This example includes three **lazy tools** — tools that are not sent to the LLM upfront. Instead, the LLM sees a `__lazy__tool__discovery__` tool that lists their names. When the LLM needs one, it discovers it first (getting the full description and schema), then calls it normally. @@ -78,7 +96,7 @@ The lazy tools are: `compareGuitars`, `calculateFinancing`, and `searchGuitars`. ### AI Capabilities -- 🤖 Powered by **TanStack AI** with OpenAI GPT-4o +- 🤖 Powered by **TanStack AI** with OpenAI `gpt-5.5` - 📝 Rich markdown formatting with syntax highlighting - 🎯 Customizable system prompts for tailored AI behavior - 🔄 Real-time streaming responses with Server-Sent Events @@ -107,7 +125,7 @@ The lazy tools are: `compareGuitars`, `calculateFinancing`, and `searchGuitars`. - **Routing**: TanStack Router - **State Management**: TanStack Store - **Styling**: Tailwind CSS -- **AI Integration**: TanStack AI with OpenAI GPT-4o +- **AI Integration**: TanStack AI with OpenAI `gpt-5.5` - **Chat Client**: `@tanstack/ai-react` with connection adapters - **Streaming**: Server-Sent Events via `fetchServerSentEvents` - **Tool Execution**: Automatic loop with `ToolCallManager` diff --git a/examples/ts-react-chat/package.json b/examples/ts-react-chat/package.json index 67b58ad43..332fd682c 100644 --- a/examples/ts-react-chat/package.json +++ b/examples/ts-react-chat/package.json @@ -34,6 +34,8 @@ "@tanstack/ai-openai": "workspace:*", "@tanstack/ai-opencode": "workspace:*", "@tanstack/ai-openrouter": "workspace:*", + "@tanstack/ai-persistence": "workspace:*", + "@tanstack/ai-persistence-sql": "workspace:*", "@tanstack/ai-react": "workspace:*", "@tanstack/ai-react-ui": "workspace:*", "@tanstack/ai-sandbox": "workspace:*", @@ -52,6 +54,7 @@ "@tanstack/store": "^0.8.0", "highlight.js": "^11.11.1", "lucide-react": "^0.561.0", + "mysql2": "^3.22.5", "react": "^19.2.3", "react-dom": "^19.2.3", "react-markdown": "^10.1.0", diff --git a/examples/ts-react-chat/src/components/Header.tsx b/examples/ts-react-chat/src/components/Header.tsx index e7fbc6a0a..2f4757619 100644 --- a/examples/ts-react-chat/src/components/Header.tsx +++ b/examples/ts-react-chat/src/components/Header.tsx @@ -5,6 +5,7 @@ import { Activity, BadgeCheck, Braces, + Database, FileAudio, FileText, Guitar, @@ -231,6 +232,19 @@ export default function Header() { Persistent Chats + setIsOpen(false)} + className="flex items-center gap-3 p-3 rounded-lg hover:bg-gray-800 transition-colors mb-2" + activeProps={{ + className: + 'flex items-center gap-3 p-3 rounded-lg bg-cyan-600 hover:bg-cyan-700 transition-colors mb-2', + }} + > + + MySQL Persistence + + setIsOpen(false)} diff --git a/examples/ts-react-chat/src/lib/mysql-persistence.test.ts b/examples/ts-react-chat/src/lib/mysql-persistence.test.ts new file mode 100644 index 000000000..4b2ec5cad --- /dev/null +++ b/examples/ts-react-chat/src/lib/mysql-persistence.test.ts @@ -0,0 +1,94 @@ +import { describe, expect, it } from 'vitest' +import { EventType } from '@tanstack/ai' +import { createMysqlPersistence } from './mysql-persistence' +import type { Pool, PoolConnection } from 'mysql2/promise' + +function createFakePool() { + const calls: Array = [] + let inserted = false + const connection = { + async execute(sql: string) { + calls.push(sql) + if (sql.includes('SELECT seq, event FROM public_events')) { + return [ + inserted + ? [ + { + seq: 1, + event: JSON.stringify({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: 'm1', + delta: 'hi', + timestamp: 1, + }), + }, + ] + : [], + [], + ] + } + if (sql.includes('SELECT MAX(seq) AS max_seq')) { + return [[{ max_seq: 0 }], []] + } + if (sql.includes('INSERT INTO public_events')) { + inserted = true + } + return [[], []] + }, + async beginTransaction() { + calls.push('BEGIN') + }, + async commit() { + calls.push('COMMIT') + }, + async rollback() { + calls.push('ROLLBACK') + }, + release() { + calls.push('RELEASE') + }, + } as unknown as PoolConnection + const pool = { + async getConnection() { + return connection + }, + async execute(sql: string) { + calls.push(sql) + return [[], []] + }, + } as unknown as Pool + return { calls, pool } +} + +describe('createMysqlPersistence', () => { + it('sets READ COMMITTED before beginning store transactions', async () => { + const { calls, pool } = createFakePool() + + const persistence = createMysqlPersistence(pool) + await persistence.stores.publicEvents!.append({ + runId: 'run-1', + expectedSeq: 0, + event: { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: 'm1', + delta: 'hi', + timestamp: 1, + }, + }) + + const beginIndex = calls.indexOf('BEGIN') + expect(beginIndex).toBeGreaterThan(0) + expect(calls[beginIndex - 1]).toBe( + 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED', + ) + }) + + it('rejects undefined bind parameter values before mysql2 execution', async () => { + const { pool } = createFakePool() + const persistence = createMysqlPersistence(pool) + + await expect( + persistence.stores.metadata!.set('scope', 'key', undefined), + ).rejects.toThrow('Unsupported MySQL bind parameter value') + }) +}) diff --git a/examples/ts-react-chat/src/lib/mysql-persistence.ts b/examples/ts-react-chat/src/lib/mysql-persistence.ts new file mode 100644 index 000000000..0ddb42885 --- /dev/null +++ b/examples/ts-react-chat/src/lib/mysql-persistence.ts @@ -0,0 +1,131 @@ +import mysql from 'mysql2/promise' +import { createSqlPersistence } from '@tanstack/ai-persistence-sql' +import type { + FieldPacket, + Pool, + PoolConnection, + PoolOptions, + QueryResult, + RowDataPacket, +} from 'mysql2/promise' +import type { ExecuteValues } from 'mysql2' +import type { SqlDriver } from '@tanstack/ai-persistence-sql' + +let pool: Pool | undefined + +interface MysqlExecutable { + execute( + sql: string, + values?: Array, + ): Promise<[T, Array]> +} + +function readPort(value: string | undefined): number | undefined { + if (!value) return undefined + const port = Number(value) + return Number.isInteger(port) && port > 0 ? port : undefined +} + +function createPoolOptions(): string | PoolOptions { + if (process.env.MYSQL_URL) return process.env.MYSQL_URL + return { + host: process.env.MYSQL_HOST || '127.0.0.1', + port: readPort(process.env.MYSQL_PORT) ?? 3306, + user: process.env.MYSQL_USER || 'root', + password: process.env.MYSQL_PASSWORD || '', + database: process.env.MYSQL_DATABASE || 'tanstack_ai_chat', + waitForConnections: true, + connectionLimit: 10, + } +} + +function getPool(): Pool { + const options = createPoolOptions() + pool ??= + typeof options === 'string' + ? mysql.createPool(options) + : mysql.createPool(options) + return pool +} + +function isExecuteValue(value: unknown): value is ExecuteValues { + if (value === null) return true + if (value === undefined) return false + const type = typeof value + if ( + type === 'string' || + type === 'number' || + type === 'bigint' || + type === 'boolean' + ) { + return true + } + if ( + value instanceof Date || + value instanceof Buffer || + value instanceof Uint8Array + ) { + return true + } + if (Array.isArray(value)) return value.every(isExecuteValue) + if (type === 'object') { + return Object.values(value).every(isExecuteValue) + } + return false +} + +function toExecuteValues(params: ReadonlyArray): Array { + return params.map((param) => { + if (!isExecuteValue(param)) { + throw new TypeError('Unsupported MySQL bind parameter value') + } + return param + }) +} + +function createDriver( + execute: MysqlExecutable, + getConnection: () => Promise, +): SqlDriver { + const driver: SqlDriver = { + dialect: 'mysql' as SqlDriver['dialect'], + async exec(sql, params = []) { + await execute.execute(sql, toExecuteValues(params)) + }, + async query = Record>( + sql: string, + params: ReadonlyArray = [], + ): Promise> { + const [rows] = await execute.execute>( + sql, + toExecuteValues(params), + ) + return rows.map((row) => ({ ...row })) as Array + }, + async transaction(fn) { + const connection = await getConnection() + const tx = createDriver(connection, getConnection) + try { + await connection.execute( + 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED', + ) + await connection.beginTransaction() + const result = await fn(tx) + await connection.commit() + return result + } catch (error) { + await connection.rollback() + throw error + } finally { + connection.release() + } + }, + } + return driver +} + +export function createMysqlPersistence(mysqlPool: Pool = getPool()) { + return createSqlPersistence( + createDriver(mysqlPool, () => mysqlPool.getConnection()), + ) +} diff --git a/examples/ts-react-chat/src/routeTree.gen.ts b/examples/ts-react-chat/src/routeTree.gen.ts index 69a7764a2..03ad53861 100644 --- a/examples/ts-react-chat/src/routeTree.gen.ts +++ b/examples/ts-react-chat/src/routeTree.gen.ts @@ -13,6 +13,7 @@ import { Route as ThreadsRouteImport } from './routes/threads' import { Route as ServerFnChatRouteImport } from './routes/server-fn-chat' import { Route as SandboxesRouteImport } from './routes/sandboxes' import { Route as RealtimeRouteImport } from './routes/realtime' +import { Route as MysqlPersistenceRouteImport } from './routes/mysql-persistence' import { Route as McpDemoRouteImport } from './routes/mcp-demo' import { Route as McpAppsRouteImport } from './routes/mcp-apps' import { Route as Issue176ToolResultRouteImport } from './routes/issue-176-tool-result' @@ -36,6 +37,7 @@ import { Route as ApiSummarizeRouteImport } from './routes/api.summarize' import { Route as ApiStructuredOutputRouteImport } from './routes/api.structured-output' import { Route as ApiStructuredChatRouteImport } from './routes/api.structured-chat' import { Route as ApiSandboxTriageRouteImport } from './routes/api.sandbox-triage' +import { Route as ApiMysqlPersistentChatRouteImport } from './routes/api.mysql-persistent-chat' import { Route as ApiMcpStatusRouteImport } from './routes/api.mcp-status' import { Route as ApiMcpPoolRouteImport } from './routes/api.mcp-pool' import { Route as ApiMcpManualRouteImport } from './routes/api.mcp-manual' @@ -74,6 +76,11 @@ const RealtimeRoute = RealtimeRouteImport.update({ path: '/realtime', getParentRoute: () => rootRouteImport, } as any) +const MysqlPersistenceRoute = MysqlPersistenceRouteImport.update({ + id: '/mysql-persistence', + path: '/mysql-persistence', + getParentRoute: () => rootRouteImport, +} as any) const McpDemoRoute = McpDemoRouteImport.update({ id: '/mcp-demo', path: '/mcp-demo', @@ -192,6 +199,11 @@ const ApiSandboxTriageRoute = ApiSandboxTriageRouteImport.update({ path: '/api/sandbox-triage', getParentRoute: () => rootRouteImport, } as any) +const ApiMysqlPersistentChatRoute = ApiMysqlPersistentChatRouteImport.update({ + id: '/api/mysql-persistent-chat', + path: '/api/mysql-persistent-chat', + getParentRoute: () => rootRouteImport, +} as any) const ApiMcpStatusRoute = ApiMcpStatusRouteImport.update({ id: '/api/mcp-status', path: '/api/mcp-status', @@ -287,6 +299,7 @@ export interface FileRoutesByFullPath { '/issue-176-tool-result': typeof Issue176ToolResultRoute '/mcp-apps': typeof McpAppsRoute '/mcp-demo': typeof McpDemoRoute + '/mysql-persistence': typeof MysqlPersistenceRoute '/realtime': typeof RealtimeRoute '/sandboxes': typeof SandboxesRoute '/server-fn-chat': typeof ServerFnChatRoute @@ -302,6 +315,7 @@ export interface FileRoutesByFullPath { '/api/mcp-manual': typeof ApiMcpManualRoute '/api/mcp-pool': typeof ApiMcpPoolRoute '/api/mcp-status': typeof ApiMcpStatusRoute + '/api/mysql-persistent-chat': typeof ApiMysqlPersistentChatRoute '/api/sandbox-triage': typeof ApiSandboxTriageRoute '/api/structured-chat': typeof ApiStructuredChatRoute '/api/structured-output': typeof ApiStructuredOutputRoute @@ -333,6 +347,7 @@ export interface FileRoutesByTo { '/issue-176-tool-result': typeof Issue176ToolResultRoute '/mcp-apps': typeof McpAppsRoute '/mcp-demo': typeof McpDemoRoute + '/mysql-persistence': typeof MysqlPersistenceRoute '/realtime': typeof RealtimeRoute '/sandboxes': typeof SandboxesRoute '/server-fn-chat': typeof ServerFnChatRoute @@ -348,6 +363,7 @@ export interface FileRoutesByTo { '/api/mcp-manual': typeof ApiMcpManualRoute '/api/mcp-pool': typeof ApiMcpPoolRoute '/api/mcp-status': typeof ApiMcpStatusRoute + '/api/mysql-persistent-chat': typeof ApiMysqlPersistentChatRoute '/api/sandbox-triage': typeof ApiSandboxTriageRoute '/api/structured-chat': typeof ApiStructuredChatRoute '/api/structured-output': typeof ApiStructuredOutputRoute @@ -380,6 +396,7 @@ export interface FileRoutesById { '/issue-176-tool-result': typeof Issue176ToolResultRoute '/mcp-apps': typeof McpAppsRoute '/mcp-demo': typeof McpDemoRoute + '/mysql-persistence': typeof MysqlPersistenceRoute '/realtime': typeof RealtimeRoute '/sandboxes': typeof SandboxesRoute '/server-fn-chat': typeof ServerFnChatRoute @@ -395,6 +412,7 @@ export interface FileRoutesById { '/api/mcp-manual': typeof ApiMcpManualRoute '/api/mcp-pool': typeof ApiMcpPoolRoute '/api/mcp-status': typeof ApiMcpStatusRoute + '/api/mysql-persistent-chat': typeof ApiMysqlPersistentChatRoute '/api/sandbox-triage': typeof ApiSandboxTriageRoute '/api/structured-chat': typeof ApiStructuredChatRoute '/api/structured-output': typeof ApiStructuredOutputRoute @@ -428,6 +446,7 @@ export interface FileRouteTypes { | '/issue-176-tool-result' | '/mcp-apps' | '/mcp-demo' + | '/mysql-persistence' | '/realtime' | '/sandboxes' | '/server-fn-chat' @@ -443,6 +462,7 @@ export interface FileRouteTypes { | '/api/mcp-manual' | '/api/mcp-pool' | '/api/mcp-status' + | '/api/mysql-persistent-chat' | '/api/sandbox-triage' | '/api/structured-chat' | '/api/structured-output' @@ -474,6 +494,7 @@ export interface FileRouteTypes { | '/issue-176-tool-result' | '/mcp-apps' | '/mcp-demo' + | '/mysql-persistence' | '/realtime' | '/sandboxes' | '/server-fn-chat' @@ -489,6 +510,7 @@ export interface FileRouteTypes { | '/api/mcp-manual' | '/api/mcp-pool' | '/api/mcp-status' + | '/api/mysql-persistent-chat' | '/api/sandbox-triage' | '/api/structured-chat' | '/api/structured-output' @@ -520,6 +542,7 @@ export interface FileRouteTypes { | '/issue-176-tool-result' | '/mcp-apps' | '/mcp-demo' + | '/mysql-persistence' | '/realtime' | '/sandboxes' | '/server-fn-chat' @@ -535,6 +558,7 @@ export interface FileRouteTypes { | '/api/mcp-manual' | '/api/mcp-pool' | '/api/mcp-status' + | '/api/mysql-persistent-chat' | '/api/sandbox-triage' | '/api/structured-chat' | '/api/structured-output' @@ -567,6 +591,7 @@ export interface RootRouteChildren { Issue176ToolResultRoute: typeof Issue176ToolResultRoute McpAppsRoute: typeof McpAppsRoute McpDemoRoute: typeof McpDemoRoute + MysqlPersistenceRoute: typeof MysqlPersistenceRoute RealtimeRoute: typeof RealtimeRoute SandboxesRoute: typeof SandboxesRoute ServerFnChatRoute: typeof ServerFnChatRoute @@ -582,6 +607,7 @@ export interface RootRouteChildren { ApiMcpManualRoute: typeof ApiMcpManualRoute ApiMcpPoolRoute: typeof ApiMcpPoolRoute ApiMcpStatusRoute: typeof ApiMcpStatusRoute + ApiMysqlPersistentChatRoute: typeof ApiMysqlPersistentChatRoute ApiSandboxTriageRoute: typeof ApiSandboxTriageRoute ApiStructuredChatRoute: typeof ApiStructuredChatRoute ApiStructuredOutputRoute: typeof ApiStructuredOutputRoute @@ -635,6 +661,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof RealtimeRouteImport parentRoute: typeof rootRouteImport } + '/mysql-persistence': { + id: '/mysql-persistence' + path: '/mysql-persistence' + fullPath: '/mysql-persistence' + preLoaderRoute: typeof MysqlPersistenceRouteImport + parentRoute: typeof rootRouteImport + } '/mcp-demo': { id: '/mcp-demo' path: '/mcp-demo' @@ -796,6 +829,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof ApiSandboxTriageRouteImport parentRoute: typeof rootRouteImport } + '/api/mysql-persistent-chat': { + id: '/api/mysql-persistent-chat' + path: '/api/mysql-persistent-chat' + fullPath: '/api/mysql-persistent-chat' + preLoaderRoute: typeof ApiMysqlPersistentChatRouteImport + parentRoute: typeof rootRouteImport + } '/api/mcp-status': { id: '/api/mcp-status' path: '/api/mcp-status' @@ -927,6 +967,7 @@ const rootRouteChildren: RootRouteChildren = { Issue176ToolResultRoute: Issue176ToolResultRoute, McpAppsRoute: McpAppsRoute, McpDemoRoute: McpDemoRoute, + MysqlPersistenceRoute: MysqlPersistenceRoute, RealtimeRoute: RealtimeRoute, SandboxesRoute: SandboxesRoute, ServerFnChatRoute: ServerFnChatRoute, @@ -942,6 +983,7 @@ const rootRouteChildren: RootRouteChildren = { ApiMcpManualRoute: ApiMcpManualRoute, ApiMcpPoolRoute: ApiMcpPoolRoute, ApiMcpStatusRoute: ApiMcpStatusRoute, + ApiMysqlPersistentChatRoute: ApiMysqlPersistentChatRoute, ApiSandboxTriageRoute: ApiSandboxTriageRoute, ApiStructuredChatRoute: ApiStructuredChatRoute, ApiStructuredOutputRoute: ApiStructuredOutputRoute, diff --git a/examples/ts-react-chat/src/routes/api.mysql-persistent-chat.ts b/examples/ts-react-chat/src/routes/api.mysql-persistent-chat.ts new file mode 100644 index 000000000..22efdf7d7 --- /dev/null +++ b/examples/ts-react-chat/src/routes/api.mysql-persistent-chat.ts @@ -0,0 +1,299 @@ +import { createFileRoute } from '@tanstack/react-router' +import { + chat, + chatParamsFromRequestBody, + EventType, + maxIterations, + toServerSentEventsResponse, +} from '@tanstack/ai' +import { openaiText } from '@tanstack/ai-openai' +import { decodeCursor, withPersistence } from '@tanstack/ai-persistence' +import { createMysqlPersistence } from '@/lib/mysql-persistence' +import type { RunErrorEvent, StreamChunk } from '@tanstack/ai' +import type { RunRecord } from '@tanstack/ai-persistence' + +const SYSTEM_PROMPT = `You are a concise assistant in a durable chat demo. + +Streams are persisted to MySQL so the browser can refresh and resume an +in-progress response. Keep answers short enough that the streaming behavior is +easy to inspect.` + +const mysqlPersistence = createMysqlPersistence() +const activeRuns = new Map>() +const TERMINAL_STATUSES = new Set(['completed', 'failed', 'interrupted']) +const POLL_DELAYS_MS = [100, 150, 250, 500, 750, 1000] as const + +type ChatParams = Awaited> + +function jsonError(message: string, status: number): Response { + return new Response(JSON.stringify({ error: message }), { + status, + headers: { 'Content-Type': 'application/json' }, + }) +} + +function runError(message: string, run?: RunRecord): RunErrorEvent { + return { + type: EventType.RUN_ERROR, + timestamp: Date.now(), + message, + ...(run ? { runId: run.runId, threadId: run.threadId } : {}), + } +} + +function isTerminal(run: RunRecord): boolean { + return TERMINAL_STATUSES.has(run.status) +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function ensureProducer(params: ChatParams): Promise { + const runId = params.runId + const existing = activeRuns.get(runId) + if (existing) { + if (!params.resume) return + await existing.catch(() => undefined) + const current = activeRuns.get(runId) + if (current && current !== existing) return + if (current === existing) activeRuns.delete(runId) + } + + const promise = (async () => { + const stream = chat({ + adapter: openaiText('gpt-5.5'), + middleware: [ + withPersistence(mysqlPersistence, { + features: ['messages', 'durable-replay', 'interrupts'], + }), + ], + systemPrompts: [SYSTEM_PROMPT], + agentLoopStrategy: maxIterations(4), + messages: params.messages, + threadId: params.threadId, + runId: params.runId, + cursor: params.cursor, + resume: params.resume, + }) + + for await (const _chunk of stream) { + // Drain the model stream so withPersistence writes every public event to + // MySQL. Browser disconnects only cancel response tailing, not this run. + } + })() + + activeRuns.set(runId, promise) + promise + .catch((error) => { + console.error('[mysql-persistent-chat] Producer failed:', error) + }) + .finally(() => { + if (activeRuns.get(runId) === promise) activeRuns.delete(runId) + }) +} + +async function validateCursorRequest( + params: ChatParams, + decodedCursor: { runId: string; seq: number }, +): Promise { + if (decodedCursor.runId !== params.runId) { + return jsonError('Cursor runId does not match request runId.', 409) + } + if (decodedCursor.seq < 1) { + return jsonError('Cursor sequence must reference a persisted event.', 400) + } + + const run = await mysqlPersistence.stores.runs!.get(params.runId) + if (!run) { + return jsonError('Run for cursor was not found.', 404) + } + if (run.threadId !== params.threadId) { + return jsonError( + 'Cursor run threadId does not match request threadId.', + 409, + ) + } + + const latestSeq = await mysqlPersistence.stores.publicEvents!.latestSeq( + params.runId, + ) + if (latestSeq === 0) { + return jsonError('Run has no persisted public events.', 404) + } + if (latestSeq < decodedCursor.seq) { + return jsonError('Cursor is ahead of persisted public events.', 409) + } + + let foundCursorEvent = false + for await (const event of mysqlPersistence.stores.publicEvents!.read( + params.runId, + { afterSeq: decodedCursor.seq - 1 }, + )) { + foundCursorEvent = event.seq === decodedCursor.seq + break + } + if (!foundCursorEvent) { + return jsonError('Cursor event was not found.', 404) + } + + if ( + run.status === 'running' && + !params.resume && + !activeRuns.has(params.runId) + ) { + return jsonError( + 'Run is still marked running, but no in-process producer is active. Restarting producers after a server process restart is out of scope for this example.', + 409, + ) + } + + if (run.status === 'failed' && latestSeq <= decodedCursor.seq) { + return jsonError( + 'Run failed and no newer persisted terminal event is available.', + 409, + ) + } + + return null +} + +async function* tailPersistedEvents( + runId: string, + afterSeq: number, + signal: AbortSignal, +): AsyncIterable { + let seq = afterSeq + let idlePolls = 0 + + while (!signal.aborted) { + let yielded = false + for await (const persisted of mysqlPersistence.stores.publicEvents!.read( + runId, + seq > 0 ? { afterSeq: seq } : undefined, + )) { + if (signal.aborted) return + seq = persisted.seq + yielded = true + idlePolls = 0 + yield persisted.event + } + + const run = await mysqlPersistence.stores.runs!.get(runId) + const activeProducer = activeRuns.has(runId) + if (!run) { + if (activeProducer) { + if (!yielded) { + const delayMs = + POLL_DELAYS_MS[Math.min(idlePolls, POLL_DELAYS_MS.length - 1)] + idlePolls += 1 + await delay(delayMs) + } + continue + } + yield runError('Run disappeared while tailing persisted events.') + return + } + + if (run.status === 'running' && !activeProducer) { + yield runError( + 'Run is still marked running, but no in-process producer is active. Restarting producers after a server process restart is out of scope for this example.', + run, + ) + return + } + + if (isTerminal(run) && !activeProducer) { + const latestSeq = + await mysqlPersistence.stores.publicEvents!.latestSeq(runId) + if (latestSeq <= seq) { + if (run.status === 'failed' && !yielded) { + yield runError( + run.error || + 'Run failed before any newer terminal event was available.', + run, + ) + } + return + } + } + + if (!yielded) { + const delayMs = + POLL_DELAYS_MS[Math.min(idlePolls, POLL_DELAYS_MS.length - 1)] + idlePolls += 1 + await delay(delayMs) + } + } +} + +export const Route = createFileRoute('/api/mysql-persistent-chat')({ + server: { + handlers: { + POST: async ({ request }) => { + let params + try { + params = await chatParamsFromRequestBody(await request.json()) + } catch (error) { + return new Response( + error instanceof Error ? error.message : 'Bad request', + { status: 400 }, + ) + } + + try { + let decodedCursor: { runId: string; seq: number } | undefined + if (params.cursor) { + try { + decodedCursor = decodeCursor(params.cursor) + } catch { + return jsonError('Invalid cursor.', 400) + } + const invalidCursorResponse = await validateCursorRequest( + params, + decodedCursor, + ) + if (invalidCursorResponse) return invalidCursorResponse + } + const runId = decodedCursor?.runId ?? params.runId + const afterSeq = decodedCursor?.seq ?? 0 + + if (!params.cursor || params.resume) { + await ensureProducer(params) + } else { + const run = await mysqlPersistence.stores.runs!.get(runId) + if (!run) return jsonError('Run was not found.', 404) + } + + const responseAbortController = new AbortController() + request.signal.addEventListener( + 'abort', + () => responseAbortController.abort(), + { once: true }, + ) + + return toServerSentEventsResponse( + tailPersistedEvents( + runId, + afterSeq, + responseAbortController.signal, + ), + { abortController: responseAbortController }, + ) + } catch (error) { + console.error('[mysql-persistent-chat] Chat request failed:', error) + return new Response( + JSON.stringify({ + error: + error instanceof Error ? error.message : 'An error occurred', + }), + { + status: 500, + headers: { 'Content-Type': 'application/json' }, + }, + ) + } + }, + }, + }, +}) diff --git a/examples/ts-react-chat/src/routes/generations.image.tsx b/examples/ts-react-chat/src/routes/generations.image.tsx index 38c6838a6..17de96e37 100644 --- a/examples/ts-react-chat/src/routes/generations.image.tsx +++ b/examples/ts-react-chat/src/routes/generations.image.tsx @@ -3,8 +3,17 @@ import { createFileRoute } from '@tanstack/react-router' import { useGenerateImage } from '@tanstack/ai-react' import type { UseGenerateImageReturn } from '@tanstack/ai-react' import { fetchServerSentEvents } from '@tanstack/ai-client' +import type { ImageGenerateInput } from '@tanstack/ai-client' import { generateImageFn, generateImageStreamFn } from '../lib/server-fns' +function textPrompt(prompt: ImageGenerateInput['prompt']): string { + if (typeof prompt === 'string') return prompt + return prompt + .filter((part) => part.type === 'text') + .map((part) => part.content) + .join('\n') +} + function StreamingImageGeneration() { const [prompt, setPrompt] = useState('') const [numberOfImages, setNumberOfImages] = useState(1) @@ -29,7 +38,8 @@ function DirectImageGeneration() { const [numberOfImages, setNumberOfImages] = useState(1) const hookReturn = useGenerateImage({ - fetcher: (input) => generateImageFn({ data: input }), + fetcher: (input) => + generateImageFn({ data: { ...input, prompt: textPrompt(input.prompt) } }), }) return ( @@ -48,7 +58,10 @@ function ServerFnImageGeneration() { const [numberOfImages, setNumberOfImages] = useState(1) const hookReturn = useGenerateImage({ - fetcher: (input) => generateImageStreamFn({ data: input }), + fetcher: (input) => + generateImageStreamFn({ + data: { ...input, prompt: textPrompt(input.prompt) }, + }), }) return ( diff --git a/examples/ts-react-chat/src/routes/generations.structured-output.tsx b/examples/ts-react-chat/src/routes/generations.structured-output.tsx index c11d3f143..523b6ba2d 100644 --- a/examples/ts-react-chat/src/routes/generations.structured-output.tsx +++ b/examples/ts-react-chat/src/routes/generations.structured-output.tsx @@ -28,14 +28,14 @@ const PROVIDER_MODELS: Record< { value: 'gpt-5.1', label: 'GPT-5.1' }, { value: 'gpt-5', label: 'GPT-5' }, { value: 'gpt-5-mini', label: 'GPT-5 Mini' }, - { value: 'gpt-4o', label: 'GPT-4o' }, + { value: 'gpt-5.5', label: 'GPT-5.5' }, ], // OpenAI Chat Completions: same model surface, older `/v1/chat/completions` // wire format. The reasoning-summary opt-in isn't available here, so // streaming reasoning won't be surfaced for gpt-5.x even though the model // is still doing it under the hood. 'openai-chat': [ - { value: 'gpt-4o', label: 'GPT-4o' }, + { value: 'gpt-5.5', label: 'GPT-5.5' }, { value: 'gpt-5-mini', label: 'GPT-5 Mini' }, { value: 'gpt-5', label: 'GPT-5' }, { value: 'gpt-5.1', label: 'GPT-5.1' }, @@ -254,9 +254,6 @@ function StructuredOutputPage() { outputSchema: GuitarRecommendationSchema, connection: fetchServerSentEvents('/api/structured-output'), forwardedProps: { provider, model, stream }, - devtools: { - outputKind: 'structured', - }, onChunk: handleChunk, onError: (err) => { setError(err.message) diff --git a/examples/ts-react-chat/src/routes/generations.video.tsx b/examples/ts-react-chat/src/routes/generations.video.tsx index 792d975a9..c48d21625 100644 --- a/examples/ts-react-chat/src/routes/generations.video.tsx +++ b/examples/ts-react-chat/src/routes/generations.video.tsx @@ -3,8 +3,17 @@ import { createFileRoute } from '@tanstack/react-router' import { useGenerateVideo } from '@tanstack/ai-react' import type { UseGenerateVideoReturn } from '@tanstack/ai-react' import { fetchServerSentEvents } from '@tanstack/ai-client' +import type { VideoGenerateInput } from '@tanstack/ai-client' import { generateVideoFn, generateVideoStreamFn } from '../lib/server-fns' +function textPrompt(prompt: VideoGenerateInput['prompt']): string { + if (typeof prompt === 'string') return prompt + return prompt + .filter((part) => part.type === 'text') + .map((part) => part.content) + .join('\n') +} + function StreamingVideoGeneration() { const [prompt, setPrompt] = useState('') @@ -21,7 +30,8 @@ function DirectVideoGeneration() { const [prompt, setPrompt] = useState('') const hookReturn = useGenerateVideo({ - fetcher: (input) => generateVideoFn({ data: input }), + fetcher: (input) => + generateVideoFn({ data: { ...input, prompt: textPrompt(input.prompt) } }), }) return ( @@ -33,7 +43,10 @@ function ServerFnVideoGeneration() { const [prompt, setPrompt] = useState('') const hookReturn = useGenerateVideo({ - fetcher: (input) => generateVideoStreamFn({ data: input }), + fetcher: (input) => + generateVideoStreamFn({ + data: { ...input, prompt: textPrompt(input.prompt) }, + }), }) return ( diff --git a/examples/ts-react-chat/src/routes/index.tsx b/examples/ts-react-chat/src/routes/index.tsx index fb1ce78f0..f50be672f 100644 --- a/examples/ts-react-chat/src/routes/index.tsx +++ b/examples/ts-react-chat/src/routes/index.tsx @@ -28,7 +28,7 @@ import { import { clientTools } from '@tanstack/ai-client' import { ThinkingPart } from '@tanstack/ai-react-ui' import type { UIMessage } from '@tanstack/ai-react' -import type { ContentPart, TranscriptionResult } from '@tanstack/ai' +import type { ContentPart } from '@tanstack/ai' import type { GeminiInteractionsCustomEventValue } from '@tanstack/ai-gemini/experimental' import type { ModelOption } from '@/lib/model-selection' import GuitarRecommendation from '@/components/example-GuitarRecommendation' @@ -445,9 +445,7 @@ function ChatPage() { // mic button is the worst outcome. const [recordError, setRecordError] = useState(null) - const { generate: transcribe, isLoading: isTranscribing } = useTranscription< - (r: TranscriptionResult) => void - >({ + const { generate: transcribe, isLoading: isTranscribing } = useTranscription({ connection: fetchServerSentEvents('/api/transcribe'), onResult: (r) => setInput((prev) => (prev ? `${prev} ${r.text}` : r.text)), // A failed transcription (network/provider) is just as silent as a mic diff --git a/examples/ts-react-chat/src/routes/mysql-persistence.tsx b/examples/ts-react-chat/src/routes/mysql-persistence.tsx new file mode 100644 index 000000000..26602fae9 --- /dev/null +++ b/examples/ts-react-chat/src/routes/mysql-persistence.tsx @@ -0,0 +1,171 @@ +import { useState } from 'react' +import { createFileRoute } from '@tanstack/react-router' +import { fetchServerSentEvents, useChat } from '@tanstack/ai-react' +import { localStorageChatPersistence } from '@tanstack/ai-client' +import { Square } from 'lucide-react' +import type { ChatResumeSnapshot, UIMessage } from '@tanstack/ai-client' + +const THREAD_ID_KEY = 'tanstack-ai:mysql-persistence:thread-id' +const RESUME_KEY_PREFIX = 'tanstack-ai:mysql-persistence:resume:' +const MESSAGES_KEY_PREFIX = 'tanstack-ai:mysql-persistence:messages:' + +function getStableThreadId(): string { + if (typeof window === 'undefined') return 'mysql-persistence-ssr' + const existing = window.localStorage.getItem(THREAD_ID_KEY) + if (existing) return existing + const id = crypto.randomUUID() + window.localStorage.setItem(THREAD_ID_KEY, id) + return id +} + +const messagePersistence = localStorageChatPersistence>({ + keyPrefix: MESSAGES_KEY_PREFIX, +}) + +const resumePersistence = localStorageChatPersistence({ + keyPrefix: RESUME_KEY_PREFIX, +}) + +function messageText(message: UIMessage): string { + return message.parts + .map((part) => { + if (part.type === 'text') return part.content + if (part.type === 'thinking') return part.content + if (part.type === 'tool-call') return `[tool:${part.name}]` + return '' + }) + .filter(Boolean) + .join('\n') +} + +function MysqlPersistenceRoute() { + const [threadId] = useState(getStableThreadId) + const [input, setInput] = useState('') + + const { + messages, + sendMessage, + isLoading, + error, + resumeState, + stop, + clear, + } = useChat({ + id: threadId, + threadId, + connection: fetchServerSentEvents('/api/mysql-persistent-chat'), + persistence: { + client: messagePersistence, + server: resumePersistence, + }, + }) + + const handleSubmit = (event: React.FormEvent) => { + event.preventDefault() + const text = input.trim() + if (!text || isLoading) return + setInput('') + void sendMessage(text) + } + + const handleReset = () => { + clear() + messagePersistence.removeItem(threadId) + resumePersistence.removeItem(threadId) + window.localStorage.removeItem(THREAD_ID_KEY) + window.location.reload() + } + + return ( +
+
+
+
+

MySQL persistent chat

+

+ Thread {threadId.slice(0, 8)} + {resumeState ? ` · run ${resumeState.runId.slice(0, 8)}` : ''} +

+
+ +
+
+ +
+
+ {messages.length === 0 && ( +
+ Send a prompt, then refresh while the response is streaming. The + same server process continues tailing persisted MySQL events from + the saved run cursor. +
+ )} + {messages.map((message) => ( +
+
+ {message.role} +
+
+ {messageText(message)} +
+
+ ))} + {error && ( +
+ {error.message} +
+ )} +
+
+ +
+
+ setInput(event.target.value)} + placeholder="Ask for a detailed answer, then refresh mid-stream" + className="min-w-0 flex-1 rounded-lg border border-gray-700 bg-gray-900 px-4 py-2 text-white placeholder-gray-500 focus:border-orange-500/50 focus:outline-none" + /> + {isLoading ? ( + + ) : ( + + )} +
+
+
+ ) +} + +export const Route = createFileRoute('/mysql-persistence')({ + component: MysqlPersistenceRoute, +}) diff --git a/knip.json b/knip.json index b977ce034..8f9670c42 100644 --- a/knip.json +++ b/knip.json @@ -58,6 +58,9 @@ "packages/ai-vue-ui": { "ignore": ["src/use-chat-context.ts"] }, - "packages/ai-bedrock": {} + "packages/ai-bedrock": {}, + "packages/ai-persistence-postgres": { + "ignoreDependencies": ["pg"] + } } } diff --git a/packages/ai-angular/package.json b/packages/ai-angular/package.json index ed05dfd51..35c3b2404 100644 --- a/packages/ai-angular/package.json +++ b/packages/ai-angular/package.json @@ -50,7 +50,7 @@ "test:lib:dev": "vitest", "test:types": "tsc", "test:build": "publint --strict", - "build": "ng-packagr -p ng-package.json -c tsconfig.build.json && rm -rf ./dist/package.json" + "build": "ng-packagr -p ng-package.json -c tsconfig.build.json && premove ./dist/package.json" }, "dependencies": { "@tanstack/ai-client": "workspace:*", diff --git a/packages/ai-angular/src/inject-chat.ts b/packages/ai-angular/src/inject-chat.ts index 5c1453abe..a61666a72 100644 --- a/packages/ai-angular/src/inject-chat.ts +++ b/packages/ai-angular/src/inject-chat.ts @@ -80,6 +80,14 @@ export function injectChat< ? { connection: options.connection } : { fetcher: options.fetcher } + let lifecycleOwned = false + const resumeIfLifecycleOwned = () => { + if (!lifecycleOwned) { + return + } + void client.maybeAutoResume() + } + const client = new ChatClient({ devtoolsBridgeFactory: createChatDevtoolsBridge, ...transport, @@ -90,6 +98,9 @@ export function injectChat< ...(options.persistence !== undefined && { persistence: options.persistence, }), + ...(options.initialResumeSnapshot !== undefined && { + initialResumeSnapshot: options.initialResumeSnapshot, + }), ...(bodySource !== undefined && { body: bodySource() }), ...(options.threadId !== undefined && { threadId: options.threadId }), ...(forwardedPropsSource !== undefined && { @@ -106,6 +117,10 @@ export function injectChat< onChunk: (chunk: StreamChunk) => options.onChunk?.(chunk), onFinish: (message) => options.onFinish?.(message), onError: (err) => options.onError?.(err), + onResumeStateChange: (resumeState, pendingInterrupts) => { + options.onResumeStateChange?.(resumeState, pendingInterrupts) + resumeIfLifecycleOwned() + }, tools: options.tools, onCustomEvent: (eventType, data, context) => options.onCustomEvent?.(eventType, data, context), @@ -156,9 +171,27 @@ export function injectChat< ) } - afterNextRender(() => client.mountDevtools(), { injector }) + afterNextRender( + () => { + lifecycleOwned = true + client.mountDevtools() + void client.maybeAutoResume() + if (typeof window !== 'undefined') { + window.addEventListener('online', handleOnline) + } + }, + { injector }, + ) + + const handleOnline = () => { + void client.maybeAutoResume() + } destroyRef.onDestroy(() => { + lifecycleOwned = false + if (typeof window !== 'undefined') { + window.removeEventListener('online', handleOnline) + } if (liveSource?.()) { client.unsubscribe() } else { diff --git a/packages/ai-angular/tests/inject-chat.test.ts b/packages/ai-angular/tests/inject-chat.test.ts index b7d0bf710..098fa9f16 100644 --- a/packages/ai-angular/tests/inject-chat.test.ts +++ b/packages/ai-angular/tests/inject-chat.test.ts @@ -1,8 +1,9 @@ import { describe, expect, it, vi } from 'vitest' import { z } from 'zod' +import { EventType } from '@tanstack/ai/client' import { Component, signal } from '@angular/core' import { TestBed } from '@angular/core/testing' -import { ChatClient } from '@tanstack/ai-client' +import { ChatClient, type ConnectionAdapter } from '@tanstack/ai-client' import { injectChat } from '../src/inject-chat' import { createMockConnectionAdapter, @@ -120,6 +121,86 @@ describe('injectChat — reactive options', () => { }) }) +describe('injectChat — resume', () => { + it('forwards onResumeStateChange to ChatClient', async () => { + const onResumeStateChange = vi.fn() + const adapter = createMockConnectionAdapter({ + chunks: [ + { + type: EventType.RUN_STARTED, + runId: 'run-1', + threadId: 'thread-1', + timestamp: Date.now(), + }, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: 'msg-1', + timestamp: Date.now(), + delta: 'Hi', + cursor: 'cursor-1', + }, + { + type: EventType.RUN_FINISHED, + runId: 'run-1', + threadId: 'thread-1', + timestamp: Date.now(), + outcome: { type: 'success' }, + }, + ], + }) + const { result } = renderInjectChat({ + connection: adapter, + threadId: 'thread-1', + onResumeStateChange, + }) + + await result.sendMessage('Hi') + + expect(onResumeStateChange).toHaveBeenCalledWith( + expect.objectContaining({ + threadId: 'thread-1', + runId: expect.any(String), + cursor: 'cursor-1', + }), + [], + ) + }) + + it('forwards initialResumeSnapshot to ChatClient auto-resume', async () => { + type RunContext = Parameters['connect']>[3] + const contexts: Array = [] + const adapter: Extract = { + async *connect(_messages, _data, _signal, runContext) { + contexts.push(runContext) + }, + } + const { flush } = renderInjectChat({ + connection: adapter, + initialResumeSnapshot: { + resumeState: { + threadId: 'thread-1', + runId: 'run-1', + cursor: 'cursor-1', + }, + pendingInterrupts: [], + }, + }) + + flush() + await tick() + + expect(contexts).toEqual([ + expect.objectContaining({ + threadId: 'thread-1', + runId: 'run-1', + cursor: 'cursor-1', + }), + ]) + }) +}) + describe('injectChat — structured output', () => { // Mount injectChat directly so the `outputSchema` generic flows through and // the schema-gated `partial` / `final` signals are present on the result. diff --git a/packages/ai-client/src/chat-client.ts b/packages/ai-client/src/chat-client.ts index d5ed91a8b..59329a260 100644 --- a/packages/ai-client/src/chat-client.ts +++ b/packages/ai-client/src/chat-client.ts @@ -17,6 +17,7 @@ import type { AnyClientTool, ContentPart, ModelMessage, + RunAgentResumeItem, StreamChunk, } from '@tanstack/ai/client' import type { @@ -34,8 +35,14 @@ import type { } from './devtools' import type { ChatClientOptions, + ChatClientPersistence, ChatClientState, ChatFetcher, + ChatPendingInterrupt, + ChatPersistenceOptions, + ChatResumeState, + ChatResumeSnapshot, + ChatServerPersistence, ConnectionStatus, MessagePart, MultimodalContent, @@ -59,6 +66,10 @@ type ChatClientUpdateOptionsWithoutContext< onSubscriptionChange?: (isSubscribed: boolean) => void onConnectionStatusChange?: (status: ConnectionStatus) => void onSessionGeneratingChange?: (isGenerating: boolean) => void + onResumeStateChange?: ( + resumeState: ChatResumeState | null, + pendingInterrupts: Array, + ) => void onCustomEvent?: ( eventType: string, data: unknown, @@ -89,6 +100,21 @@ function resolveTransport(transport: { throw new Error('ChatClient: either `connection` or `fetcher` is required.') } +function normalizePersistence>( + persistence: ChatPersistenceOptions | undefined, +): { + client?: ChatClientPersistence + server?: ChatServerPersistence +} { + if (!persistence) { + return {} + } + if ('getItem' in persistence) { + return { client: persistence } + } + return persistence +} + export class ChatClient< TTools extends ReadonlyArray = any, TContext = unknown, @@ -101,7 +127,28 @@ export class ChatClient< // chunks after a mid-stream clear) live in ChatPersistor so this class stays // focused on streaming. Undefined when no `persistence` adapter is configured. private readonly persistor?: ChatPersistor + private readonly serverPersistence?: ChatServerPersistence + private serverPersistenceGeneration = 0 + private disposed = false private currentRunId: string | null = null + // Resume tracking: the latest in-band cursor seen for the active run, so a + // reconnect can replay events after it. Cleared when the run terminates. + private lastResume: ChatResumeState | null = null + private pendingInterrupts: Array = [] + private pendingInterruptRunId: string | null = null + private readonly pendingInterruptResumeItems = new Map< + string, + RunAgentResumeItem + >() + private readonly autoResume: boolean + // When set, the next streamResponse() resumes this run/cursor instead of + // starting a fresh run (consumed once). + private pendingResumeRunId: string | null = null + private pendingResumeThreadId: string | null = null + private pendingResumeCursor: string | null = null + private pendingResumeItems: Array | null = null + private activeResumeThreadId: string | null = null + private activeResumeRunId: string | null = null // Track the legacy `body` option and the canonical `forwardedProps` // option as separate slots so that `updateOptions({ forwardedProps })` // doesn't wipe a previously-set `body` (and vice versa). They are @@ -144,6 +191,8 @@ export class ChatClient< private draining = false private sessionGenerating = false private readonly activeRunIds = new Set() + private readonly clearedRunIds = new Set() + private clearedRunlessRunId: string | null = null private devtoolsMounted = false private readonly callbacksRef: { @@ -159,6 +208,10 @@ export class ChatClient< onSubscriptionChange: (isSubscribed: boolean) => void onConnectionStatusChange: (status: ConnectionStatus) => void onSessionGeneratingChange: (isGenerating: boolean) => void + onResumeStateChange: ( + resumeState: ChatResumeState | null, + pendingInterrupts: Array, + ) => void onCustomEvent: ( eventType: string, data: unknown, @@ -170,13 +223,16 @@ export class ChatClient< constructor(options: ChatClientOptions) { this.uniqueId = options.id || this.generateUniqueId('chat') this.threadId = options.threadId || this.generateUniqueId('thread') - if (options.persistence) { + this.autoResume = options.autoResume ?? true + const persistence = normalizePersistence(options.persistence) + if (persistence.client) { this.persistor = new ChatPersistor( - options.persistence, + persistence.client, this.uniqueId, (messages) => this.processor.setMessages(messages), ) } + this.serverPersistence = persistence.server // Both `body` (deprecated) and `forwardedProps` populate the AG-UI // `RunAgentInput.forwardedProps` wire field. They are stored // separately so `updateOptions` can replace one without touching the @@ -215,10 +271,19 @@ export class ChatClient< options.onConnectionStatusChange || (() => {}), onSessionGeneratingChange: options.onSessionGeneratingChange || (() => {}), + onResumeStateChange: options.onResumeStateChange || (() => {}), onCustomEvent: options.onCustomEvent || (() => {}), }, } + const storedResumeSnapshot = options.initialResumeSnapshot + ? undefined + : this.readInitialResumeSnapshot() + const resumeSnapshot = options.initialResumeSnapshot ?? storedResumeSnapshot + if (resumeSnapshot && !(resumeSnapshot instanceof Promise)) { + this.applyResumeSnapshot(resumeSnapshot) + } + // Create StreamProcessor with event handlers. // Use conditional spreads so we don't pass `undefined` into // `StreamProcessorOptions` fields under `exactOptionalPropertyTypes`. @@ -433,6 +498,52 @@ export class ChatClient< }) this.persistor?.hydrateAsync(persistedMessages) + if ( + !options.initialResumeSnapshot && + storedResumeSnapshot instanceof Promise + ) { + this.hydrateResumeSnapshotAsync(storedResumeSnapshot) + } + } + + private readInitialResumeSnapshot(): + | ChatResumeSnapshot + | null + | undefined + | Promise { + try { + return this.serverPersistence?.getItem(this.threadId) + } catch { + return undefined + } + } + + private hydrateResumeSnapshotAsync( + snapshot: Promise, + ): void { + const generation = this.serverPersistenceGeneration + snapshot + .then((resolvedSnapshot) => { + if ( + resolvedSnapshot && + !this.lastResume && + !this.disposed && + generation === this.serverPersistenceGeneration + ) { + this.applyResumeSnapshot(resolvedSnapshot) + this.notifyResumeStateChange() + } + }) + .catch(() => { + // Persistence adapters are best-effort and must not break chat setup. + }) + } + + private applyResumeSnapshot(snapshot: ChatResumeSnapshot): void { + this.lastResume = { ...snapshot.resumeState } + this.pendingInterrupts = [...(snapshot.pendingInterrupts ?? [])] + this.pendingInterruptRunId = + this.pendingInterrupts.length > 0 ? this.lastResume.runId : null } mountDevtools(): void { @@ -445,17 +556,40 @@ export class ChatClient< } /** - * Drain a runId-less RUN_ERROR that belongs to a cleared run the client is - * still tracking. The persistor owns the cleared-run bookkeeping; the client - * owns the active-run / session / processing state. + * Retire cleared-run tracking for ignored terminal chunks without processing + * their stale message or resume payloads. */ - private drainIgnoredRunlessChunk(chunk: StreamChunk): void { - if (chunk.type !== 'RUN_ERROR') return - const runId = this.persistor?.takeRunlessRunId() + private retireIgnoredClearedTerminalChunk(chunk: StreamChunk): void { + if (chunk.type !== 'RUN_FINISHED' && chunk.type !== 'RUN_ERROR') return + const runId = + getChunkRunId(chunk) ?? + this.persistor?.takeRunlessRunId() ?? + this.clearedRunlessRunId if (!runId) return + this.clearedRunIds.delete(runId) + if (this.clearedRunlessRunId === runId) { + this.clearedRunlessRunId = null + } this.activeRunIds.delete(runId) this.setSessionGenerating(this.activeRunIds.size > 0) - this.resolveProcessing() + if (!getChunkRunId(chunk)) { + this.resolveProcessing() + } + } + + private shouldIgnoreClearedChunk(chunk: StreamChunk): boolean { + if (this.persistor) { + return false + } + const runId = getChunkRunId(chunk) + if (runId && this.clearedRunIds.has(runId)) { + this.clearedRunlessRunId = runId + return true + } + if (!runId && this.clearedRunlessRunId) { + return true + } + return false } private updateRunLifecycle( @@ -464,6 +598,11 @@ export class ChatClient< ): void { if (chunk.type === 'RUN_STARTED') { const chunkRunId = getChunkRunId(chunk) ?? chunk.runId + this.activeResumeThreadId = + 'threadId' in chunk && typeof chunk.threadId === 'string' + ? chunk.threadId + : this.activeResumeThreadId + this.activeResumeRunId = chunkRunId this.activeRunIds.add(chunkRunId) this.persistor?.onRunStarted(chunkRunId) this.setSessionGenerating(true) @@ -489,6 +628,159 @@ export class ChatClient< } } + /** + * Observe the in-band resume cursor on each chunk so a reconnect can replay + * after the last seen event. Cleared when the run reaches a terminal event. + */ + private observeResumeCursor(chunk: StreamChunk): void { + if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') { + // A server-signaled terminal event completes the run — drop its resume + // state. (A stream that merely ends without a terminal is an interruption + // and keeps its resume state so it can be continued.) + const runId = getChunkRunId(chunk) + const threadId = + 'threadId' in chunk && typeof chunk.threadId === 'string' + ? chunk.threadId + : this.activeResumeThreadId + const cursor = + 'cursor' in chunk && typeof chunk.cursor === 'string' + ? chunk.cursor + : undefined + if ( + chunk.type === 'RUN_FINISHED' && + chunk.outcome?.type === 'interrupt' + ) { + const interruptedRunId = + this.currentRunId && + (cursor || this.lastResume?.runId === this.currentRunId) + ? this.currentRunId + : (runId ?? this.activeResumeRunId ?? this.currentRunId ?? '') + const resumeTarget = + this.lastResume?.runId === interruptedRunId ? this.lastResume : null + if (cursor) { + this.lastResume = { + threadId: threadId ?? this.threadId, + runId: interruptedRunId, + cursor, + } + } else if (!resumeTarget) { + this.pendingInterrupts = [] + this.pendingInterruptRunId = null + this.pendingInterruptResumeItems.clear() + this.notifyResumeStateChange() + return + } + this.pendingInterruptRunId = interruptedRunId + this.pendingInterrupts = [...chunk.outcome.interrupts] + this.pendingInterruptResumeItems.clear() + this.notifyResumeStateChange() + return + } + + const isRunlessSessionError = chunk.type === 'RUN_ERROR' && !runId + const isTrackedRunTerminal = Boolean( + runId && this.lastResume?.runId === runId, + ) + const isPendingInterruptRunTerminal = Boolean( + runId && this.pendingInterruptRunId === runId, + ) + const isCurrentRunTerminal = Boolean( + (runId && this.currentRunId === runId) || + (this.currentRunId && this.lastResume?.runId === this.currentRunId), + ) + const isCurrentStreamTerminal = + this.isLoading && chunk.type === 'RUN_FINISHED' && !runId + if ( + isRunlessSessionError || + isTrackedRunTerminal || + isPendingInterruptRunTerminal || + isCurrentRunTerminal || + isCurrentStreamTerminal + ) { + this.lastResume = null + } + if ( + isRunlessSessionError || + isTrackedRunTerminal || + isPendingInterruptRunTerminal || + isCurrentRunTerminal || + isCurrentStreamTerminal + ) { + this.pendingInterrupts = [] + this.pendingInterruptRunId = null + this.pendingInterruptResumeItems.clear() + } + this.notifyResumeStateChange() + return + } + const cursor = + 'cursor' in chunk && typeof chunk.cursor === 'string' + ? chunk.cursor + : undefined + if (cursor && this.currentRunId) { + this.lastResume = { + threadId: this.activeResumeThreadId ?? this.threadId, + runId: this.currentRunId, + cursor, + } + this.notifyResumeStateChange() + } + } + + /** + * The resume state for the active/interrupted run (the run id plus the last + * cursor seen), or null when there is nothing to resume. Apps can persist this + * to resume across a full reload; in-session reconnects use it automatically + * via {@link maybeAutoResume}. + */ + getResumeState(): ChatResumeState | null { + return this.lastResume ? { ...this.lastResume } : null + } + + getPendingInterrupts(): Array { + return [...this.pendingInterrupts] + } + + /** + * Resume a run by replaying its persisted events after the last cursor, then + * continuing live — without re-sending messages. Uses the supplied state, or + * the tracked in-session state. No-op (returns false) when there is nothing to + * resume or a stream is already in flight. + */ + resume(state?: ChatResumeState): Promise { + const target = state ?? this.lastResume + if (!target || this.isLoading) return Promise.resolve(false) + this.pendingResumeThreadId = target.threadId + this.pendingResumeRunId = target.runId + this.pendingResumeCursor = target.cursor + return this.streamResponse() + } + + resumeInterrupts( + resume: Array, + state?: ChatResumeState, + ): Promise { + const target = state ?? this.lastResume + if (!target || this.isLoading) return Promise.resolve(false) + this.pendingResumeThreadId = target.threadId + this.pendingResumeRunId = target.runId + this.pendingResumeCursor = target.cursor + this.pendingResumeItems = [...resume] + return this.streamResponse() + } + + /** + * Auto-resume hook for framework integrations to call on mount / when the tab + * comes back online. Honors the `autoResume` option (default true) and only + * fires when an interrupted run is tracked and no stream is in flight. + */ + maybeAutoResume(): Promise { + if (!this.autoResume || this.isLoading || !this.lastResume) { + return Promise.resolve(false) + } + return this.resume() + } + private generateUniqueId(prefix: string): string { return `${prefix}-${Date.now()}-${Math.random().toString(36).substring(7)}` } @@ -524,6 +816,78 @@ export class ChatClient< this.devtoolsBridge.emitSnapshot() } + private notifyResumeStateChange(): void { + const resumeState = this.getResumeState() + const pendingInterrupts = this.getPendingInterrupts() + this.persistResumeSnapshot(resumeState, pendingInterrupts) + this.callbacksRef.current.onResumeStateChange( + resumeState, + pendingInterrupts, + ) + } + + private persistResumeSnapshot( + resumeState: ChatResumeState | null, + pendingInterrupts: Array, + ): void { + if (!this.serverPersistence) { + return + } + + try { + const generation = ++this.serverPersistenceGeneration + const result = resumeState + ? this.serverPersistence.setItem(this.threadId, { + resumeState, + pendingInterrupts, + }) + : this.serverPersistence.removeItem(this.threadId) + if (result instanceof Promise) { + result + .catch(() => { + // Persistence adapters are best-effort and must not break chat. + }) + .finally(() => { + if (generation !== this.serverPersistenceGeneration) { + this.reconcileStaleServerPersistenceWrite() + } + }) + } + } catch { + // Persistence adapters are best-effort and must not break chat. + } + } + + private reconcileStaleServerPersistenceWrite(): void { + if (!this.serverPersistence) { + return + } + + try { + const generation = this.serverPersistenceGeneration + const resumeState = this.getResumeState() + const result = resumeState + ? this.serverPersistence.setItem(this.threadId, { + resumeState, + pendingInterrupts: this.getPendingInterrupts(), + }) + : this.serverPersistence.removeItem(this.threadId) + if (result instanceof Promise) { + result + .catch(() => { + // Persistence adapters are best-effort and must not break chat. + }) + .finally(() => { + if (generation !== this.serverPersistenceGeneration) { + this.reconcileStaleServerPersistenceWrite() + } + }) + } + } catch { + // Persistence adapters are best-effort and must not break chat. + } + } + private resetSessionGenerating(): void { this.activeRunIds.clear() this.persistor?.resetIgnored() @@ -675,14 +1039,15 @@ export class ChatClient< if (this.connectionStatus === 'connecting') { this.setConnectionStatus('connected') } - const shouldIgnore = this.persistor?.shouldIgnoreChunk(chunk) ?? false + const shouldIgnore = + this.shouldIgnoreClearedChunk(chunk) || + (this.persistor?.shouldIgnoreChunk(chunk) ?? false) if (shouldIgnore) { if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') { if (getChunkRunId(chunk)) { this.updateRunLifecycle(chunk, { resolveProcessing: false }) - } else { - this.drainIgnoredRunlessChunk(chunk) } + this.retireIgnoredClearedTerminalChunk(chunk) } continue } @@ -696,6 +1061,7 @@ export class ChatClient< // per-run error only clears that run, while a runId-less RUN_ERROR is // treated as a session-level error that clears every active run. this.updateRunLifecycle(chunk) + this.observeResumeCursor(chunk) // Yield control back to event loop for UI updates await new Promise((resolve) => setTimeout(resolve, 0)) } @@ -764,7 +1130,7 @@ export class ChatClient< * ], * id: 'custom-message-id' * }, - * { model: 'gpt-4-audio' } + * { model: 'gpt-5.5' } * ) * ``` */ @@ -777,6 +1143,11 @@ export class ChatClient< if (emptyMessage || this.isLoading) { return } + if (this.pendingInterrupts.length > 0 && this.lastResume) { + throw new Error( + 'ChatClient: cannot send normal input while pending interrupts exist. Use resumeInterrupts() instead.', + ) + } // Normalize input to extract content, id, and validate const normalizedContent = this.normalizeMessageInput(content) @@ -812,6 +1183,11 @@ export class ChatClient< */ async append(message: UIMessage | ModelMessage): Promise { this.mountDevtools() + if (this.pendingInterrupts.length > 0 && this.lastResume) { + throw new Error( + 'ChatClient: cannot append normal input while pending interrupts exist. Use resumeInterrupts() instead.', + ) + } // Normalize the message to ensure it has id and createdAt const normalizedMessage = normalizeToUIMessage(message, generateMessageId) @@ -854,8 +1230,24 @@ export class ChatClient< // Track generation so a superseded stream's cleanup doesn't clobber the new one const generation = ++this.streamGeneration - const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` + // Resuming reuses the original runId so the server replays that run's events. + const resumeThreadId = this.pendingResumeThreadId + const resumeRunId = this.pendingResumeRunId + const resumeCursor = this.pendingResumeCursor + const resumeItems = this.pendingResumeItems + const isResumeRequest = Boolean( + resumeThreadId || resumeRunId || resumeCursor || resumeItems, + ) + this.pendingResumeThreadId = null + this.pendingResumeRunId = null + this.pendingResumeCursor = null + this.pendingResumeItems = null + const runId = + resumeRunId ?? + `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` this.currentRunId = runId + this.activeResumeThreadId = resumeThreadId ?? this.threadId + this.activeResumeRunId = runId this.setIsLoading(true) this.setStatus('submitted') @@ -874,7 +1266,7 @@ export class ChatClient< try { // Get UIMessages with parts (preserves approval state and client tool results) - const messages = this.processor.getMessages() + const messages = isResumeRequest ? [] : this.processor.getMessages() const clientTools = new Map(this.clientToolsRef.current) const runtimeContext = this.context @@ -935,7 +1327,7 @@ export class ChatClient< // JSON Schema; sending a Standard Schema instance directly would // serialize to an unusable shape. const runContext = { - threadId: this.threadId, + threadId: resumeThreadId ?? this.threadId, runId, clientTools: Array.from(clientTools.values()).map((t) => ({ name: t.name, @@ -945,8 +1337,10 @@ export class ChatClient< : { type: 'object' }, })), forwardedProps: { ...mergedBody }, + ...(resumeCursor ? { cursor: resumeCursor } : {}), + ...(resumeItems ? { resume: resumeItems } : {}), } - this.devtoolsBridge.beginRun(runContext.runId, this.threadId) + this.devtoolsBridge.beginRun(runContext.runId, runContext.threadId) activeDevtoolsRunId = runContext.runId this.devtoolsBridge.emitRunLifecycle( 'run:created', @@ -963,6 +1357,10 @@ export class ChatClient< // Send through normalized connection (pushes chunks to subscription queue) await this.connection.send(messages, mergedBody, signal, runContext) + if (generation !== this.streamGeneration || signal.aborted) { + return false + } + // Wait for subscription loop to finish processing all chunks await processingComplete @@ -1164,24 +1562,41 @@ export class ChatClient< * Clear all messages */ clear(): void { + const hadLocalStream = this.abortController !== null + for (const runId of this.activeRunIds) { + this.clearedRunIds.add(runId) + } + if (this.currentRunId) { + this.clearedRunIds.add(this.currentRunId) + this.clearedRunlessRunId = this.currentRunId + } if (this.persistor) { this.persistor.snapshotClear({ messages: this.processor.getMessages(), activeRunIds: this.activeRunIds, currentRunId: this.currentRunId, }) - if (this.isLoading) { - this.cancelInFlightStream({ setReadyStatus: true }) - this.resetSessionGenerating() - } else if (this.activeRunIds.size > 0) { - this.resetSessionGenerating() - } // Suppress persisting the empty snapshot that clearMessages emits, then // remove the stored conversation outright. this.persistor.beginClear() } + if (this.isLoading || hadLocalStream) { + this.cancelInFlightStream({ setReadyStatus: true }) + this.resetSessionGenerating() + } else if (this.activeRunIds.size > 0) { + this.resetSessionGenerating() + } this.processor.clearMessages() this.persistor?.remove() + this.lastResume = null + this.pendingInterrupts = [] + this.pendingInterruptRunId = null + this.pendingInterruptResumeItems.clear() + this.pendingResumeThreadId = null + this.pendingResumeRunId = null + this.pendingResumeCursor = null + this.pendingResumeItems = null + this.notifyResumeStateChange() this.setError(undefined) this.events.messagesCleared() } @@ -1234,6 +1649,35 @@ export class ChatClient< : undefined, ) + const pendingInterrupt = this.pendingInterrupts.find( + (interrupt) => + interrupt.toolCallId === result.toolCallId || + interrupt.id === result.toolCallId, + ) + if (pendingInterrupt && this.lastResume) { + const resumeItem: RunAgentResumeItem = { + interruptId: pendingInterrupt.id, + status: 'resolved', + payload: + result.state === 'output-error' + ? { error: result.errorText || 'Tool execution failed' } + : result.output, + } + const resumeItems = this.collectPendingInterruptResumeItems(resumeItem) + if (!resumeItems) { + return + } + if (this.isLoading) { + this.queuePostStreamAction(async () => { + await this.resumeInterrupts(resumeItems) + }) + return + } + + await this.resumeInterrupts(resumeItems) + return + } + // If stream is in progress, queue continuation check for after it ends if (this.isLoading) { this.queuePostStreamAction(() => this.checkForContinuation()) @@ -1261,6 +1705,9 @@ export class ChatClient< id: string // approval.id, not toolCallId approved: boolean }): Promise { + const pendingInterrupt = this.pendingInterrupts.find( + (interrupt) => interrupt.id === response.id, + ) // Find the tool call ID from the approval ID const messages = this.processor.getMessages() let foundToolCallId: string | undefined @@ -1288,6 +1735,25 @@ export class ChatClient< this.processor.addToolApprovalResponse(response.id, response.approved) this.devtoolsBridge.emitSnapshot() + if (pendingInterrupt && this.lastResume) { + const resumeItems = this.collectPendingInterruptResumeItems({ + interruptId: response.id, + status: response.approved ? 'resolved' : 'cancelled', + payload: { approved: response.approved }, + }) + if (!resumeItems) { + return + } + if (this.isLoading) { + this.queuePostStreamAction(async () => { + await this.resumeInterrupts(resumeItems) + }) + return + } + await this.resumeInterrupts(resumeItems) + return + } + // If stream is in progress, queue continuation check for after it ends if (this.isLoading) { this.queuePostStreamAction(() => this.checkForContinuation()) @@ -1297,6 +1763,23 @@ export class ChatClient< await this.checkForContinuation() } + private collectPendingInterruptResumeItems( + resumeItem: RunAgentResumeItem, + ): Array | null { + this.pendingInterruptResumeItems.set(resumeItem.interruptId, resumeItem) + const resumeItems: Array = [] + for (const interrupt of this.pendingInterrupts) { + const answeredInterrupt = this.pendingInterruptResumeItems.get( + interrupt.id, + ) + if (!answeredInterrupt) { + return null + } + resumeItems.push(answeredInterrupt) + } + return resumeItems + } + /** * Queue an action to be executed after the current stream ends */ @@ -1512,12 +1995,19 @@ export class ChatClient< this.callbacksRef.current.onSessionGeneratingChange = options.onSessionGeneratingChange } + if (options.onResumeStateChange !== undefined) { + this.callbacksRef.current.onResumeStateChange = + options.onResumeStateChange + } if (options.onCustomEvent !== undefined) { this.callbacksRef.current.onCustomEvent = options.onCustomEvent } } dispose(): void { + this.disposed = true + this.serverPersistenceGeneration++ + this.persistor?.dispose() this.unsubscribe() this.devtoolsBridge.dispose() this.devtoolsMounted = false diff --git a/packages/ai-client/src/client-persistor.ts b/packages/ai-client/src/client-persistor.ts index 517aaf0a7..d2305ea36 100644 --- a/packages/ai-client/src/client-persistor.ts +++ b/packages/ai-client/src/client-persistor.ts @@ -1,6 +1,47 @@ import { getChunkRunId } from './connection-adapters' import type { StreamChunk } from '@tanstack/ai/client' -import type { ChatClientPersistence, UIMessage } from './types' +import type { + ChatClientPersistence, + ChatStorageAdapter, + UIMessage, +} from './types' + +export interface LocalStorageChatPersistenceOptions { + keyPrefix?: string + serialize?: (value: TValue) => string + deserialize?: (value: string) => TValue +} + +export function localStorageChatPersistence( + options: LocalStorageChatPersistenceOptions = {}, +): ChatStorageAdapter { + const { + keyPrefix = 'tanstack-ai:', + serialize = JSON.stringify, + deserialize = JSON.parse, + } = options + + const getStorage = (): Storage | undefined => + typeof globalThis.localStorage === 'undefined' + ? undefined + : globalThis.localStorage + const getKey = (id: string) => `${keyPrefix}${id}` + + return { + getItem(id) { + const storage = getStorage() + if (!storage) return undefined + const item = storage.getItem(getKey(id)) + return item === null ? null : deserialize(item) + }, + setItem(id, value) { + getStorage()?.setItem(getKey(id), serialize(value)) + }, + removeItem(id) { + getStorage()?.removeItem(getKey(id)) + }, + } +} // `StreamChunk` is a discriminated union; `toolCallId` / `messageId` / // `parentMessageId` exist on only some members. Narrow with `in` (matching @@ -115,6 +156,12 @@ export class ChatPersistor { }) } + /** Invalidate in-flight hydration and queued persistence operations. */ + dispose(): void { + this.messagesGeneration++ + this.generation++ + } + /** * Record a message-list change and queue a `setItem` write for it. Skips a * single write after {@link beginClear} so the clear's empty snapshot isn't diff --git a/packages/ai-client/src/connection-adapters.ts b/packages/ai-client/src/connection-adapters.ts index 3c4010047..e427e7a1b 100644 --- a/packages/ai-client/src/connection-adapters.ts +++ b/packages/ai-client/src/connection-adapters.ts @@ -6,6 +6,7 @@ import { import { parseSseDataLine } from './sse-utils' import type { ModelMessage, + RunAgentResumeItem, RunErrorEvent, RunFinishedEvent, StreamChunk, @@ -199,6 +200,14 @@ export interface RunAgentInputContext { threadId: string runId: string parentRunId?: string + /** + * Resume cursor. When set, the request resumes `runId` — the server replays + * persisted events after this cursor (see `chat({ cursor })`). On a resume the + * client sends no new messages. + */ + cursor?: string + /** AG-UI interrupt resume entries returned to the server on a follow-up run. */ + resume?: Array /** Client-declared tools to advertise in the request payload. */ clientTools?: Array<{ name: string @@ -443,6 +452,8 @@ function buildRunAgentInputBody( ...(runContext?.parentRunId !== undefined && { parentRunId: runContext.parentRunId, }), + ...(runContext?.cursor !== undefined && { cursor: runContext.cursor }), + ...(runContext?.resume !== undefined && { resume: runContext.resume }), state: {}, messages: wireMessages, tools: runContext?.clientTools ?? [], @@ -482,7 +493,7 @@ function buildRunAgentInputBody( * const connection = fetchServerSentEvents('/api/chat', async () => ({ * body: { * provider: 'openai', - * model: 'gpt-4o', + * model: 'gpt-5.5', * } * })); * ``` @@ -566,7 +577,7 @@ export function fetchServerSentEvents( * const connection = fetchHttpStream('/api/chat', async () => ({ * body: { * provider: 'openai', - * model: 'gpt-4o', + * model: 'gpt-5.5', * } * })); * ``` @@ -941,13 +952,14 @@ export function stream( messages: Array | Array, data?: Record, abortSignal?: AbortSignal, + runContext?: RunAgentInputContext, ) => AsyncIterable, ): ConnectConnectionAdapter { return { - async *connect(messages, data, abortSignal) { + async *connect(messages, data, abortSignal, runContext) { // Pass messages as-is (UIMessages with parts preserved) // Server-side chat() handles conversion to ModelMessages - yield* streamFactory(messages, data, abortSignal) + yield* streamFactory(messages, data, abortSignal, runContext) }, } } @@ -982,6 +994,8 @@ export function fetcherToConnectionAdapter( data, threadId: runContext.threadId, runId: runContext.runId, + ...(runContext.cursor !== undefined && { cursor: runContext.cursor }), + ...(runContext.resume !== undefined && { resume: runContext.resume }), }, { signal: abortSignal }, ) @@ -1047,13 +1061,14 @@ export function rpcStream( messages: Array | Array, data?: Record, abortSignal?: AbortSignal, + runContext?: RunAgentInputContext, ) => AsyncIterable, ): ConnectConnectionAdapter { return { - async *connect(messages, data, abortSignal) { + async *connect(messages, data, abortSignal, runContext) { // Pass messages as-is (UIMessages with parts preserved) // Server-side chat() handles conversion to ModelMessages - yield* rpcCall(messages, data, abortSignal) + yield* rpcCall(messages, data, abortSignal, runContext) }, } } diff --git a/packages/ai-client/src/index.ts b/packages/ai-client/src/index.ts index bd9270ff6..1c414824a 100644 --- a/packages/ai-client/src/index.ts +++ b/packages/ai-client/src/index.ts @@ -21,9 +21,15 @@ export type { ThinkingPart, StructuredOutputPart, // Client configuration types + ChatStorageAdapter, ChatClientPersistence, + ChatServerPersistence, + ChatPersistenceOptions, ChatClientOptions, + ChatPendingInterrupt, ClientContextOptionFromTools, + ChatResumeState, + ChatResumeSnapshot, ChatRequestBody, InferChatMessages, InferredClientContext, @@ -36,6 +42,8 @@ export type { DistributedOmit, MultimodalContent, } from './types' +export { localStorageChatPersistence } from './client-persistor' +export type { LocalStorageChatPersistenceOptions } from './client-persistor' // Generation client types export type { InferGenerationOutput, @@ -72,7 +80,12 @@ export type { ExtractToolInput, ExtractToolOutput, } from './tool-types' -export type { AnyClientTool } from '@tanstack/ai/client' +export type { + AnyClientTool, + Interrupt, + RunAgentResumeItem, + RunFinishedOutcome, +} from '@tanstack/ai/client' export type { RealtimeAdapter, RealtimeConnection, diff --git a/packages/ai-client/src/types.ts b/packages/ai-client/src/types.ts index 25e07c42b..c73ce1ed1 100644 --- a/packages/ai-client/src/types.ts +++ b/packages/ai-client/src/types.ts @@ -7,7 +7,9 @@ import type { ImagePart, InferToolInput, InferToolOutput, + Interrupt, ModelMessage, + RunAgentResumeItem, StreamChunk, StructuredOutputPart, UIResourcePart, @@ -17,7 +19,20 @@ import type { ConnectionAdapter } from './connection-adapters' import type { AIDevtoolsClientMetadata } from './devtools' import type { ChatDevtoolsBridgeFactory } from './devtools-noop' -export type { StructuredOutputPart } from '@tanstack/ai/client' +export type { StructuredOutputPart } + +export interface ChatResumeState { + threadId: string + runId: string + cursor: string +} + +export type ChatPendingInterrupt = Interrupt + +export interface ChatResumeSnapshot { + resumeState: ChatResumeState + pendingInterrupts?: Array +} /** * `messages` is the full UIMessage history (not a delta). `data` is the @@ -31,6 +46,8 @@ export interface ChatFetcherInput { data?: Record threadId: string runId: string + cursor?: string + resume?: Array } export interface ChatFetcherOptions { @@ -269,23 +286,38 @@ export interface UIMessage< createdAt?: Date } -export interface ChatClientPersistence< - TTools extends ReadonlyArray = any, -> { +export interface ChatStorageAdapter { getItem: ( id: string, - ) => - | Array> - | null - | undefined - | Promise> | null | undefined> - setItem: ( - id: string, - messages: Array>, - ) => void | Promise + ) => TValue | null | undefined | Promise + setItem: (id: string, value: TValue) => void | Promise removeItem: (id: string) => void | Promise } +export type ChatClientPersistence< + TTools extends ReadonlyArray = any, +> = ChatStorageAdapter>> + +export type ChatServerPersistence = ChatStorageAdapter + +/** + * @deprecated Passing a message adapter directly as `persistence: adapter` + * remains supported for compatibility. Use `persistence: { client: adapter }` + * instead. + */ +export type LegacyChatPersistenceOptions< + TTools extends ReadonlyArray = any, +> = ChatClientPersistence + +export type ChatPersistenceOptions< + TTools extends ReadonlyArray = any, +> = + | LegacyChatPersistenceOptions + | { + client?: ChatClientPersistence + server?: ChatServerPersistence + } + type IsUnknown = unknown extends T ? [T] extends [unknown] ? true @@ -377,9 +409,13 @@ export interface ChatClientBaseOptions< initialMessages?: Array> /** - * Optional persistence adapter for chat messages. + * Optional persistence adapters for chat state. + * + * Prefer `persistence: { client, server }`. `client` stores client-rendered + * `UIMessage[]` using this chat's `id`; `server` stores the resume snapshot + * `{ resumeState, pendingInterrupts }` using this chat's `threadId`. */ - persistence?: ChatClientPersistence + persistence?: ChatPersistenceOptions /** * Unique identifier for this chat instance @@ -393,6 +429,22 @@ export interface ChatClientBaseOptions< */ threadId?: string + /** + * Whether to auto-resume an interrupted run when {@link maybeAutoResume} is + * called (e.g. by a framework integration on mount / when the tab comes back + * online). Requires server-side persistence so the run's events can be + * replayed by `runId + cursor`. Defaults to `true`; set `false` to opt out. + */ + autoResume?: boolean + + /** + * Initial resumable run state, useful when rehydrating a persisted client + * after a full page reload. The server still owns replay/resume decisions; + * this only restores the client-side interrupt descriptors needed to send + * AG-UI resume entries. + */ + initialResumeSnapshot?: ChatResumeSnapshot + /** * Arbitrary client-controlled JSON forwarded to the server in the * AG-UI `RunAgentInput.forwardedProps` field. Use this for per-session @@ -480,6 +532,14 @@ export interface ChatClientBaseOptions< */ onSessionGeneratingChange?: (isGenerating: boolean) => void + /** + * Callback when resumable run state or pending interrupts change. + */ + onResumeStateChange?: ( + resumeState: ChatResumeState | null, + pendingInterrupts: Array, + ) => void + /** * Callback when a custom event is received from a server-side tool. * Custom events are emitted by tools using `context.emitCustomEvent()` during execution. diff --git a/packages/ai-client/tests/chat-client-resume.test.ts b/packages/ai-client/tests/chat-client-resume.test.ts new file mode 100644 index 000000000..8973c0d55 --- /dev/null +++ b/packages/ai-client/tests/chat-client-resume.test.ts @@ -0,0 +1,1833 @@ +import { describe, expect, it, vi } from 'vitest' +import { EventType } from '@tanstack/ai/client' +import { ChatClient } from '../src/chat-client' +import { + createApprovalToolCallChunks, + createToolCallChunks, +} from './test-utils' +import type { + ConnectConnectionAdapter, + RunAgentInputContext, +} from '../src/connection-adapters' +import type { RunAgentResumeItem, StreamChunk } from '@tanstack/ai/client' +import type { ChatResumeSnapshot, ChatServerPersistence } from '../src/types' + +/** + * Adapter that records each connect's runContext and yields scripted chunks. + * A script can be a function of the live `runContext` (so a test can emit a + * RUN_FINISHED carrying the same runId the client generated and passed in). + */ +type Script = + | Array + | ((ctx: RunAgentInputContext | undefined) => Array) + +function recordingAdapter(scripts: Array