From 80e9f9ecae9c2f3f0d0180032cd7acc43f2799a9 Mon Sep 17 00:00:00 2001 From: Justin Carper Date: Thu, 25 Jun 2026 11:44:44 -0500 Subject: [PATCH] fix(session): retry resumed turns that fail against an expired agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A pooled Cursor agent can pass resume() yet fail the subsequent send when Cursor's server has already expired it — surfacing as `Cursor run ended with status "error"` after a session sits idle. acquireAgent only wrapped resumeAgent() in its create-fallback, so a successful-resume-then-failed-send went uncaught and failed the turn (server retention is shorter than our local 7-day reuse window and is undocumented). agentRun now wraps the resumed-turn stream: on a resumed turn that throws before emitting any event (and is not aborted), it re-creates a fresh agent, replays the full transcript, and re-pools under the same session, overwriting the dead agentId. Guarded to a single attempt; never retries a fresh-create turn, an already-emitting stream, or a user abort. --- src/provider/language-model.ts | 69 ++++++- test/language-model.test.ts | 364 +++++++++++++++++++++++++++++++++ 2 files changed, 426 insertions(+), 7 deletions(-) create mode 100644 test/language-model.test.ts diff --git a/src/provider/language-model.ts b/src/provider/language-model.ts index 7cdcd62..5e30ee6 100644 --- a/src/provider/language-model.ts +++ b/src/provider/language-model.ts @@ -184,7 +184,10 @@ export class CursorLanguageModel implements LanguageModelV3 { } } - const acquired = await acquireAgent({ + // Shared acquire params. The retry path reuses this verbatim (minus + // resumeAgentId) so a fresh agent can never drift from the first attempt's + // config (sandbox, settingSources, MCP, etc.). + const baseAcquire = { apiKey: this.requireApiKey(), modelSelection, mode, @@ -198,9 +201,13 @@ export class CursorLanguageModel implements LanguageModelV3 { ...(mcpServers ? { mcpServers } : {}), ...(this.config.agents ? { agents: this.config.agents } : {}), ...(poolKey ? { name: `opencode/${sessionID!.slice(-8)}` } : {}), - ...(resumeAgentId ? { resumeAgentId } : {}), ...(poolKey ? { poolKey } : {}), ...(record ? { record } : {}), + }; + + const acquired = await acquireAgent({ + ...baseAcquire, + ...(resumeAgentId ? { resumeAgentId } : {}), }); // A resumed agent already remembers the prior conversation, so send only the @@ -210,13 +217,61 @@ export class CursorLanguageModel implements LanguageModelV3 { promptToCursorMessage(options.prompt)) : promptToCursorMessage(options.prompt); + let yielded = false; + let releasedOriginal = false; try { - yield* streamAgentTurn(acquired.agent, message, { - mode, - abortSignal: options.abortSignal, - }); + try { + for await (const event of streamAgentTurn(acquired.agent, message, { + mode, + abortSignal: options.abortSignal, + })) { + yielded = true; + yield event; + } + } catch (err) { + // Resume-aware retry: a resumed agent can pass resume() yet fail the + // actual send when Cursor's server has already expired the agent (its + // server-side retention is shorter than our local 7-day reuse window, + // and not documented). If nothing has been emitted downstream yet and + // the user hasn't aborted, transparently re-create a fresh agent and + // replay the full transcript — self-healing, no context loss. The + // fresh agent re-pools under the same session (overwriting the dead + // agentId) via acquireAgent's existing pooling path. + if ( + acquired.resumed && + !yielded && + !options.abortSignal?.aborted + ) { + acquired.release(); + releasedOriginal = true; + // A fresh create (no resumeAgentId) re-pools under the same + // session, overwriting the dead agentId. If re-acquiring itself + // fails (e.g. transient create error), surface that but keep the + // original resume failure as the cause for diagnosability. + let retry: Awaited>; + try { + retry = await acquireAgent({ ...baseAcquire }); + } catch (retryErr) { + if (retryErr instanceof Error && retryErr.cause === undefined) { + retryErr.cause = err; + } + throw retryErr; + } + try { + const replay = promptToCursorMessage(options.prompt); + yield* streamAgentTurn(retry.agent, replay, { + mode, + abortSignal: options.abortSignal, + }); + } finally { + retry.release(); + } + } else { + throw err; + } + } } finally { - acquired.release(); + if (!releasedOriginal) acquired.release(); } } diff --git a/test/language-model.test.ts b/test/language-model.test.ts new file mode 100644 index 0000000..2108acf --- /dev/null +++ b/test/language-model.test.ts @@ -0,0 +1,364 @@ +import { mkdtempSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { LanguageModelV3StreamPart } from "@ai-sdk/provider"; +import type { Run, SDKUserMessage } from "@cursor/sdk"; + +// Sandbox the on-disk session store away from the user's real cache dir. +process.env.XDG_CACHE_HOME = mkdtempSync(join(tmpdir(), "cursor-lm-test-")); + +const create = vi.fn(); +const resume = vi.fn(); + +vi.mock("../src/cursor-runtime.js", () => ({ + loadCursorSdk: async () => ({ Agent: { create, resume } }), +})); + +vi.mock("../src/api-key.js", () => ({ + resolveCursorApiKey: () => "test-key", +})); + +const { CursorLanguageModel } = await import( + "../src/provider/language-model.js" +); +const { + clearAgentPool, + getPooledAgentId, + resetSessionPoolMemory, +} = await import("../src/provider/session-pool.js"); + +type OnDelta = (input: { + update: Record & { type: string }; +}) => void; + +interface FakeAgentOpts { + agentId: string; + /** Updates to emit via onDelta before the run resolves. */ + updates?: Array & { type: string }>; + /** Final run status + result. */ + result?: { status: string; result?: string }; + /** Captures the message passed to send() for assertions. */ + sentMessages?: SDKUserMessage[]; +} + +/** Build a fake AgentLike whose send() drives onDelta and resolves wait(). */ +function fakeAgent(opts: FakeAgentOpts) { + return { + agentId: opts.agentId, + send: async ( + message: SDKUserMessage, + sendOptions?: Record, + ) => { + opts.sentMessages?.push(message); + const onDelta = sendOptions?.["onDelta"] as OnDelta | undefined; + for (const update of opts.updates ?? []) onDelta?.({ update }); + const run: Partial = { + wait: async () => + (opts.result ?? { status: "finished", result: "ok" }) as never, + cancel: async () => {}, + }; + return run as Run; + }, + close: vi.fn(), + } as unknown as import("../src/provider/agent-backend.js").AgentLike; +} + +const sys = (text: string) => ({ role: "system" as const, content: text }); +const user = (text: string) => ({ + role: "user" as const, + content: [{ type: "text" as const, text }], +}); + +function makeModel() { + return new CursorLanguageModel("m", { + providerName: "cursor", + cwd: "/tmp", + mode: "agent", + session: "auto", + }); +} + +async function streamCall( + model: ReturnType, + opts: Parameters["doStream"]>[0], +): Promise> { + const r = await model.doStream(opts); + return r.stream; +} + +async function collectStream( + stream: + | ReadableStream + | Promise>, +): Promise { + const reader = (await stream).getReader(); + const out: LanguageModelV3StreamPart[] = []; + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + out.push(value); + } + return out; +} + +const eventTypes = (parts: LanguageModelV3StreamPart[]) => + parts.map((p) => p.type); + +beforeEach(() => { + create.mockReset(); + resume.mockReset(); + clearAgentPool(); +}); + +afterEach(() => { + clearAgentPool(); +}); + +describe("CursorLanguageModel doStream — resume-aware retry", () => { + it("re-creates a fresh agent + full transcript when a resumed turn errors before emitting", async () => { + const model = makeModel(); + const firstSent: SDKUserMessage[] = []; + const retrySent: SDKUserMessage[] = []; + + // Turn 1: fresh create, pools under the session. + create.mockResolvedValueOnce( + fakeAgent({ agentId: "a1", sentMessages: firstSent }), + ); + await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + expect(getPooledAgentId("s1")).toBe("a1"); + + // Turn 2: resume → run ends "error" with no deltas. Retry → fresh create succeeds. + resume.mockResolvedValueOnce( + fakeAgent({ + agentId: "a1", + result: { status: "error", result: "agent expired" }, + }), + ); + create.mockResolvedValueOnce( + fakeAgent({ agentId: "a2", sentMessages: retrySent }), + ); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi"), user("there")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + // Retry produced a finish, not an error. + expect(eventTypes(parts)).not.toContain("error"); + expect(eventTypes(parts)).toContain("finish"); + + // Resume attempted once; create called twice (turn 1 + retry). + expect(resume).toHaveBeenCalledOnce(); + expect(create).toHaveBeenCalledTimes(2); + + // Pool re-pointed to the fresh agent. + expect(getPooledAgentId("s1")).toBe("a2"); + + // Resumed turn sent only the latest message; retry sent the full transcript. + expect(firstSent[0]?.text).toContain("hi"); + expect(retrySent[0]?.text).toContain("# User\nhi"); + expect(retrySent[0]?.text).toContain("there"); + }); + + it("does not retry when a resumed turn errors after already yielding events", async () => { + const model = makeModel(); + + // Turn 1: pool a1. + create.mockResolvedValueOnce(fakeAgent({ agentId: "a1" })); + await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + // Turn 2: resume emits a text delta, THEN errors. + resume.mockResolvedValueOnce( + fakeAgent({ + agentId: "a1", + updates: [{ type: "text-delta", text: "partial" }], + result: { status: "error", result: "expired" }, + }), + ); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi"), user("there")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + // Error propagates; no retry (create not called again). + expect(eventTypes(parts)).toContain("error"); + expect(create).toHaveBeenCalledOnce(); // turn 1 only + expect(resume).toHaveBeenCalledOnce(); + }); + + it("does not retry a fresh-create turn that errors (resumed === false)", async () => { + const model = makeModel(); + create.mockResolvedValueOnce( + fakeAgent({ agentId: "a1", result: { status: "error", result: "boom" } }), + ); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + expect(eventTypes(parts)).toContain("error"); + expect(create).toHaveBeenCalledOnce(); + expect(resume).not.toHaveBeenCalled(); + }); + + it("does not retry when the abort signal is already fired", async () => { + const model = makeModel(); + + // Turn 1: pool a1. + create.mockResolvedValueOnce(fakeAgent({ agentId: "a1" })); + await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + // Turn 2: resume errors, but the user already aborted. + resume.mockResolvedValueOnce( + fakeAgent({ + agentId: "a1", + result: { status: "error", result: "expired" }, + }), + ); + const ac = new AbortController(); + ac.abort(); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi"), user("there")], + providerOptions: { cursor: { sessionID: "s1" } }, + abortSignal: ac.signal, + } as never), + ); + + // No retry; error propagates. + expect(eventTypes(parts)).toContain("error"); + expect(create).toHaveBeenCalledOnce(); // turn 1 only + expect(resume).toHaveBeenCalledOnce(); + }); + + it("propagates the error when the retry itself also fails", async () => { + const model = makeModel(); + + // Turn 1: pool a1. + create.mockResolvedValueOnce(fakeAgent({ agentId: "a1" })); + await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + // Turn 2: resume errors (no emit), retry create also errors. + resume.mockResolvedValueOnce( + fakeAgent({ + agentId: "a1", + result: { status: "error", result: "expired" }, + }), + ); + create.mockResolvedValueOnce( + fakeAgent({ agentId: "a2", result: { status: "error", result: "boom" } }), + ); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi"), user("there")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + expect(eventTypes(parts)).toContain("error"); + expect(resume).toHaveBeenCalledOnce(); + expect(create).toHaveBeenCalledTimes(2); + }); + + it("retries a non-pooled explicit-agentId resume, closing both agents and pooling neither", async () => { + const model = makeModel(); + + // Explicit agentId: usePool is false, so the turn resumes without pooling. + const original = fakeAgent({ + agentId: "explicit", + result: { status: "error", result: "expired" }, + }) as unknown as { close: ReturnType }; + const fresh = fakeAgent({ agentId: "fresh" }) as unknown as { + close: ReturnType; + }; + resume.mockResolvedValueOnce(original); + create.mockResolvedValueOnce(fresh); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1", agentId: "explicit" } }, + } as never), + ); + + // Retry succeeded: a finish, no error. + expect(eventTypes(parts)).not.toContain("error"); + expect(eventTypes(parts)).toContain("finish"); + expect(resume).toHaveBeenCalledWith("explicit", expect.anything()); + expect(create).toHaveBeenCalledOnce(); + + // Non-pooled: both agents closed on release; nothing pooled under the session. + expect(original.close).toHaveBeenCalled(); + expect(fresh.close).toHaveBeenCalled(); + expect(getPooledAgentId("s1")).toBeUndefined(); + }); + + it("chains the original resume failure as cause when re-acquire throws", async () => { + const model = makeModel(); + + // Turn 1: pool a1. + create.mockResolvedValueOnce(fakeAgent({ agentId: "a1" })); + await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + // Turn 2: resume errors (no emit); the retry's acquireAgent itself rejects. + resume.mockResolvedValueOnce( + fakeAgent({ + agentId: "a1", + result: { status: "error", result: "expired" }, + }), + ); + create.mockRejectedValueOnce(new Error("create failed")); + + const parts = await collectStream( + streamCall(model, { + prompt: [sys("S"), user("hi"), user("there")], + providerOptions: { cursor: { sessionID: "s1" } }, + } as never), + ); + + const errPart = parts.find((p) => p.type === "error") as + | { type: "error"; error: unknown } + | undefined; + expect(errPart).toBeDefined(); + const error = errPart?.error as Error; + expect(error.message).toBe("create failed"); + // Original resume failure preserved as the cause for diagnosability. + expect((error.cause as Error)?.message).toContain("error"); + }); +}); \ No newline at end of file