From 1c3a0baba4eb9bc9d57e0a5d03380f8bd32ab4c9 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Wed, 10 Jun 2026 19:03:57 +0000 Subject: [PATCH 1/6] feat: seed root trajectory id + drop vestigial open action Two radix-native subagent-session lifecycle fixes: 1. Root trajectory seed. applySubagentBridge only populates parent_trajectory_id when a child inherits a non-empty DYN_AGENT_TRAJECTORY_ID. The root never set one, so the first generation of pi-subagents had nothing to inherit -- the bridge no-opped and every agent appeared as its own flat top-level session (session_id == trajectory_id, no parent). seedRootTrajectory() seeds it at the root (trace on, not a PI_SUBAGENT_CHILD, id unset; prefers DYN_AGENT_SESSION_ID), so spawned subagents now carry parent_trajectory_id. 2. Drop the vestigial open action. The radix-native backend is close-only / implicit-open (the tag is the only session state); the first turn's action:"open" was ignored. controlForTurn now emits a bare session_id each turn; close is unchanged. Also fixes pre-existing tsc errors in program-close.test.ts (optional handler invocation under noUncheckedIndexedAccess). Signed-off-by: Ishan Dhanani --- src/dynamo-provider.ts | 50 +++++++++++++++++++++---------- src/index.ts | 5 ++++ test/dynamo-provider.test.ts | 58 ++++++++++++++++++++++++++++-------- test/program-close.test.ts | 8 ++--- 4 files changed, 89 insertions(+), 32 deletions(-) diff --git a/src/dynamo-provider.ts b/src/dynamo-provider.ts index 6e02b2c..3565562 100644 --- a/src/dynamo-provider.ts +++ b/src/dynamo-provider.ts @@ -193,6 +193,28 @@ export function applySubagentBridge(env: NodeJS.ProcessEnv = process.env): boole return true; } +/** + * Seed a root trajectory id so spawned pi-subagents have a parent to inherit. + * `applySubagentBridge` only fires when a child inherits a non-empty + * `DYN_AGENT_TRAJECTORY_ID`; if the root never sets one, the first generation of + * subagents inherits nothing, the bridge no-ops, and the whole chain stays flat + * (no `parent_trajectory_id`). Only the ROOT seeds — a pi-subagents child already + * inherits its parent's id, and a caller-set id wins. Uses `DYN_AGENT_SESSION_ID` + * when present (root trajectory == its session) else a fresh id. Gated on + * `DYN_AGENT_TRACE`. Mutates env in place; must run before any subagent spawn. + * Returns whether a seed was written. + */ +export function seedRootTrajectory( + env: NodeJS.ProcessEnv = process.env, + mkId: () => string = randomUUID, +): boolean { + if (!isTruthyEnv(getEnvValue(env, "DYN_AGENT_TRACE"))) return false; + if (getEnvValue(env, "PI_SUBAGENT_CHILD") === "1") return false; + if (getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID")) return false; + env.DYN_AGENT_TRAJECTORY_ID = getEnvValue(env, "DYN_AGENT_SESSION_ID") ?? mkId(); + return true; +} + function parsePositiveIntOrUndefined(value: string | undefined): number | undefined { if (value === undefined) return undefined; const parsed = Number.parseInt(value, 10); @@ -364,15 +386,14 @@ function toOpenAICompletionsModel(model: Model): Model<"openai-completions" type FetchLike = (input: string, init: RequestInit) => Promise<{ ok: boolean; status: number }>; /** - * One streaming session for one pi-subagents child process. Dynamo has no - * standalone close RPC — close must ride a routed request — so the lifecycle is: - * - first turn -> action "open" (+ timeout): the worker holds this subagent's - * KV in a dedicated slot outside the radix tree, invisible to eviction. - * - later turns -> bare session_id: the router pins them to the same worker - * (O(1) KV restore). + * One session for one pi-subagents child process. The radix-native backend has + * no open step (the tag is the only session state), so the lifecycle is: + * - every turn -> bare session_id: tags that turn's KV as ordinary, evictable + * radix (LRU-neutral — no pinned slot), and keys sticky routing to one worker. * - on agent_end -> a throwaway max_tokens=1 request carries action "close", - * freeing the KV deterministically instead of waiting out the idle timeout. - * The idle timeout is the safety net if the process dies before close fires. + * bulk-freeing this session's tagged KV instead of waiting for LRU. + * Dynamo has no standalone close RPC — close must ride a routed request. If close + * never lands (e.g. SIGKILL), the KV is ordinary radix and LRU reclaims it. */ export class DynamoSubagentSession { readonly sessionId: string; @@ -397,13 +418,12 @@ export class DynamoSubagentSession { this.createRequestId = createRequestId; } - /** Build session_control for the current turn and advance lifecycle state. */ + /** Build session_control for the current turn. No open step under radix-native: + * the bare session_id tags this turn's KV; `opened` only gates the close. */ controlForTurn(): DynamoSessionControl { - const action = this.opened ? undefined : ("open" as const); this.opened = true; return { session_id: this.sessionId, - ...(action ? { action } : {}), ...(this.timeoutSecs !== undefined ? { timeout: this.timeoutSecs } : {}), }; } @@ -414,10 +434,10 @@ export class DynamoSubagentSession { * block Pi's shutdown on it). The 5s timeout bounds a hung frontend; the idle * reaper covers the case where this never lands. * - * Re-openable: clearing `opened` synchronously both guards a double-fire - * (agent_end then session_shutdown) and lets a later turn re-emit `action: - * "open"`. agent_end fires once per prompt, so a multi-prompt subagent frees - * its KV between prompts and re-warms a fresh session on the next one. + * Re-armable: clearing `opened` synchronously both guards a double-fire + * (agent_end then session_shutdown) and lets a later turn re-tag a fresh + * session. agent_end fires once per prompt, so a multi-prompt subagent frees + * its KV between prompts and re-warms on the next one's first tagged turn. */ async close(fetchImpl: FetchLike = fetch): Promise { if (!this.opened) return false; diff --git a/src/index.ts b/src/index.ts index 99c9016..279dc70 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,10 +13,15 @@ import { DynamoSubagentSession, discoverDynamoModels, readDynamoConfig, + seedRootTrajectory, } from "./dynamo-provider.js"; import { registerDynamoToolEventRelay } from "./tool-relay.js"; export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise { + // Seed a root trajectory id (root only) BEFORE anything spawns subagents, so + // the first generation of pi-subagents has a parent to inherit; without it the + // bridge no-ops and the whole chain stays flat (no parent_trajectory_id). + seedRootTrajectory(); // Mutate process.env BEFORE readDynamoConfig so the rewrite also reaches // any pi-subagents this process later spawns. readDynamoConfig itself // recomputes the rewrite independently, so omitting this call still diff --git a/test/dynamo-provider.test.ts b/test/dynamo-provider.test.ts index 3db8673..170c298 100644 --- a/test/dynamo-provider.test.ts +++ b/test/dynamo-provider.test.ts @@ -18,6 +18,7 @@ import { mergeDynamoSessionControl, normalizeDynamoBaseUrl, readDynamoConfig, + seedRootTrajectory, } from "../src/dynamo-provider.js"; // Spread `base` with the given keys dropped (env-absent). Avoids the @@ -169,6 +170,39 @@ describe("pi-subagents trajectory bridge", () => { }); }); +describe("root trajectory seed", () => { + it("seeds DYN_AGENT_TRAJECTORY_ID at the root so subagents inherit a parent", () => { + const env: NodeJS.ProcessEnv = { DYN_AGENT_TRACE: "1" }; + expect(seedRootTrajectory(env, () => "root-traj")).toBe(true); + expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("root-traj"); + // The bug fix: a subagent spawned from this env now resolves a parent. + const childEnv = { + ...env, + PI_SUBAGENT_CHILD: "1", + PI_SUBAGENT_RUN_ID: "run-1", + PI_SUBAGENT_CHILD_AGENT: "researcher", + }; + expect(computeSubagentTrajectoryRewrite(childEnv)).toEqual({ + parentTrajectoryId: "root-traj", + trajectoryId: "run-1:researcher:0", + }); + }); + + it("uses DYN_AGENT_SESSION_ID as the root trajectory when present", () => { + const env: NodeJS.ProcessEnv = { DYN_AGENT_TRACE: "1", DYN_AGENT_SESSION_ID: "sess-7" }; + expect(seedRootTrajectory(env, () => "unused")).toBe(true); + expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("sess-7"); + }); + + it("no-ops when trace is off, in a subagent child, or trajectory already set", () => { + expect(seedRootTrajectory({}, () => "x")).toBe(false); + expect(seedRootTrajectory({ DYN_AGENT_TRACE: "1", PI_SUBAGENT_CHILD: "1" }, () => "x")).toBe(false); + const preset: NodeJS.ProcessEnv = { DYN_AGENT_TRACE: "1", DYN_AGENT_TRAJECTORY_ID: "caller" }; + expect(seedRootTrajectory(preset, () => "x")).toBe(false); + expect(preset.DYN_AGENT_TRAJECTORY_ID).toBe("caller"); + }); +}); + describe("agent context injection", () => { it("defaults both trajectory_id and session_id to the Pi session ID", () => { expect(buildDynamoAgentContext(config, { sessionId: "pi-session" })).toEqual({ @@ -351,7 +385,7 @@ describe("subagent session control", () => { sessionControlId: "run-1:scout:3", sessionTimeoutSecs: 60, }); - expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", action: "open", timeout: 60 }); + expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", timeout: 60 }); expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", timeout: 60 }); expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", timeout: 60 }); }); @@ -362,25 +396,25 @@ describe("subagent session control", () => { apiKey: "k", sessionControlId: "sess-1", }); - expect(session.controlForTurn()).toEqual({ session_id: "sess-1", action: "open" }); + expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); }); it("merges nvext.session_control without dropping existing nvext fields", () => { const payload = mergeDynamoSessionControl( { model: "demo", nvext: { extra_fields: ["worker_id"], agent_context: { phase: "reasoning" } } }, - { session_id: "sess-1", action: "open", timeout: 60 }, + { session_id: "sess-1", timeout: 60 }, ); expect(payload).toEqual({ model: "demo", nvext: { extra_fields: ["worker_id"], agent_context: { phase: "reasoning" }, - session_control: { session_id: "sess-1", action: "open", timeout: 60 }, + session_control: { session_id: "sess-1", timeout: 60 }, }, }); }); - it("close fires a throwaway action:close request, is idempotent, and skips before any open", async () => { + it("close fires a throwaway action:close request, is idempotent, and skips before any turn", async () => { const calls: Array<{ url: string; body: unknown; headers: unknown }> = []; const fakeFetch = async (url: string, init: RequestInit) => { calls.push({ @@ -396,11 +430,11 @@ describe("subagent session control", () => { () => "close-req-1", ); - // No turn has opened the session yet: close is a no-op. + // No turn has tagged the session yet: close is a no-op. expect(await session.close(fakeFetch)).toBe(false); expect(calls).toHaveLength(0); - session.controlForTurn(); // open + session.controlForTurn(); // first tagged turn session.modelId = "zai-org/GLM-4.7-Flash"; expect(await session.close(fakeFetch)).toBe(true); @@ -417,7 +451,7 @@ describe("subagent session control", () => { expect((calls[0]?.headers as Record)["x-request-id"]).toBe("close-req-1"); }); - it("re-opens on the next turn after a close (multi-prompt subagent)", async () => { + it("re-arms on the next turn after a close (multi-prompt subagent)", async () => { const fakeFetch = async () => ({ ok: true, status: 200 }); const session = new DynamoSubagentSession({ baseUrl: "http://dynamo.test/v1", @@ -425,12 +459,11 @@ describe("subagent session control", () => { sessionControlId: "sess-1", }); - expect(session.controlForTurn()).toEqual({ session_id: "sess-1", action: "open" }); + expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); expect(await session.close(fakeFetch)).toBe(true); - // A later prompt's first turn must re-open rather than emit a bare, - // already-freed session id. - expect(session.controlForTurn()).toEqual({ session_id: "sess-1", action: "open" }); + // A later prompt's first turn re-tags the session; close fires again. + expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); expect(await session.close(fakeFetch)).toBe(true); }); @@ -462,7 +495,6 @@ describe("subagent session control", () => { expect(injected.nvext.agent_context).toMatchObject({ phase: "reasoning" }); expect(injected.nvext.session_control).toEqual({ session_id: "run-1:scout:3", - action: "open", timeout: 60, }); expect(session.modelId).toBe(DEFAULT_DYNAMO_MODEL_ID); diff --git a/test/program-close.test.ts b/test/program-close.test.ts index 42a90b1..2532f46 100644 --- a/test/program-close.test.ts +++ b/test/program-close.test.ts @@ -68,8 +68,8 @@ describe("program close (trajectory_final) — multiturn", () => { const closeBodies = installFetch(); const { pi, handlers } = makePi(); await dynamoProviderExtension(pi as any); - await handlers.session_shutdown({ type: "session_shutdown", reason: "reload" }); - await handlers.session_shutdown({ type: "session_shutdown", reason: "fork" }); + await handlers.session_shutdown!({ type: "session_shutdown", reason: "reload" }); + await handlers.session_shutdown!({ type: "session_shutdown", reason: "fork" }); expect(closeBodies).toHaveLength(0); }); @@ -77,9 +77,9 @@ describe("program close (trajectory_final) — multiturn", () => { const closeBodies = installFetch(); const { pi, handlers } = makePi(); await dynamoProviderExtension(pi as any); - await handlers.session_shutdown({ type: "session_shutdown", reason: "quit" }); + await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); // idempotent: a second quit (or any later event) must not re-close - await handlers.session_shutdown({ type: "session_shutdown", reason: "quit" }); + await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); expect(closeBodies).toHaveLength(1); const ctx = closeBodies[0].nvext.agent_context; expect(ctx.trajectory_final).toBe(true); From 1b18a9d7ace8ab3c02163d1128f881fa9689c9fa Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 15 Jun 2026 12:58:46 +0000 Subject: [PATCH 2/6] feat: use trajectory-native radix release Signed-off-by: Ishan Dhanani --- CLAUDE.md | 11 +- README.md | 69 +++++----- scripts/install-dynamo.sh | 2 +- scripts/integration-smoke.sh | 12 +- scripts/launch-agg-agent.sh | 24 ++-- src/dynamo-provider.ts | 248 ++++++++++------------------------- src/index.ts | 83 ++---------- src/tool-relay.ts | 65 +++------ test/dynamo-provider.test.ts | 175 ++++++++---------------- test/integration/smoke.mjs | 4 +- test/program-close.test.ts | 38 +++++- test/tool-relay.test.ts | 28 ++-- 12 files changed, 276 insertions(+), 483 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index f2fcb6c..f84d45a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -8,8 +8,8 @@ SPDX-License-Identifier: Apache-2.0 Pi extension registering a `dynamo` provider for Dynamo's OpenAI-compatible chat-completions endpoint. Three source files in `src/` (~650 lines total): - `index.ts` — extension entrypoint; calls `readDynamoConfig`, discovers models via `/v1/models`, registers the provider, wires the tool-event relay. -- `dynamo-provider.ts` — config + agent_context construction + streamSimple wrapper + subagent `session_control`. Reads `DYN_AGENT_*` and `PI_SUBAGENT_*` env vars. Gated by the `DYN_AGENT_TRACE` master switch: when set, emits `nvext.agent_context` on every LLM request and drives subagent KV sessions; when unset, registers a plain `dynamo/` provider. -- `tool-relay.ts` — ZMQ PUSH publisher for Pi tool events. Connects to a Dynamo-bound PULL endpoint. Wire format: `[topic, seq_be_u64, msgpack(AgentTraceRecord)]`. +- `dynamo-provider.ts` — config + agent_context construction + streamSimple wrapper. Reads `DYN_REQUEST_TRACE`, `DYN_AGENT_*`, and `PI_SUBAGENT_*` env vars. Gated by the `DYN_REQUEST_TRACE` master switch: when set, emits `nvext.agent_context` on every LLM request and sends `trajectory_final` at trajectory end; when unset, registers a plain `dynamo/` provider. +- `tool-relay.ts` — ZMQ PUSH publisher for Pi tool events. Connects to a Dynamo-bound PULL endpoint. Wire format: `[topic, seq_be_u64, msgpack(RequestTraceRecord)]`. ## Build, test, check @@ -22,7 +22,7 @@ npm run build # tsc -p tsconfig.build.json → dist/ Tests live in `test/` as siblings of `src/`. Use vitest's `describe`/`it`/`expect`. Mirror the existing structure: one test file per source file, fixture data inline rather than separate fixture files. -`test/integration/smoke.mjs` is the out-of-band end-to-end check — driven by `scripts/integration-smoke.sh`, not vitest. It boots Dynamo's frontend + mocker, sends one real chat completion, and asserts `nvext.agent_context` round-trips into the trace JSONL. Two cases: top-level agent_context and the pi-subagents bridge. Mocker output is garbage; assertions only target the trace envelope. CI clones `ai-dynamo/dynamo@main` and builds from source — published wheels lag behind the agent trace sink surface, so the wheel path can't actually exercise this package. Cargo cache keeps warm runs ~60-90s, cold ~10 min. `workflow_dispatch` accepts a `dynamo_ref` input for ad-hoc validation against a specific branch, tag, or SHA. +`test/integration/smoke.mjs` is the out-of-band end-to-end check — driven by `scripts/integration-smoke.sh`, not vitest. It boots Dynamo's frontend + mocker, sends one real chat completion, and asserts `nvext.agent_context` round-trips into the request trace JSONL. Two cases: top-level agent_context and the pi-subagents bridge. Mocker output is garbage; assertions only target the trace envelope. CI clones `ai-dynamo/dynamo@main` and builds from source. Cargo cache keeps warm runs ~60-90s, cold ~10 min. `workflow_dispatch` accepts a `dynamo_ref` input for ad-hoc validation against a specific branch, tag, or SHA. ## Coding standards @@ -47,7 +47,8 @@ Tests live in `test/` as siblings of `src/`. Use vitest's `describe`/`it`/`expec | Prefix | Direction | Examples | |---|---|---| | `DYNAMO_*` | client config (we read) | `DYNAMO_BASE_URL`, `DYNAMO_API_KEY` | -| `DYN_AGENT_*` | dynamo agent context (we read + emit) | `DYN_AGENT_SESSION_ID`, `DYN_AGENT_TRAJECTORY_ID`, `DYN_AGENT_TOOL_EVENTS_ZMQ_ENDPOINT` | +| `DYN_AGENT_*` | dynamo agent context (we read + emit) | `DYN_AGENT_SESSION_ID`, `DYN_AGENT_TRAJECTORY_ID` | +| `DYN_REQUEST_TRACE*` | request trace switch and tool bridge | `DYN_REQUEST_TRACE`, `DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT` | | `PI_SUBAGENT_*` | pi-subagents bookkeeping (we read only) | `PI_SUBAGENT_CHILD`, `PI_SUBAGENT_RUN_ID`, `PI_SUBAGENT_CHILD_AGENT`, `PI_SUBAGENT_CHILD_INDEX` | | `OPENAI_BASE_URL` | OpenAI-compatibility fallback (we read) | only consulted when `DYNAMO_BASE_URL` is unset | @@ -70,5 +71,5 @@ External contributions are not currently accepted. This is an NVIDIA-internal co - The `nvext.agent_context` schema field names match ATIF (`session_type_id`, `session_id`, `trajectory_id`, `parent_trajectory_id`). Don't rename them — downstream tooling in Dynamo's converter and benchmark stack joins on these. - The `phase: "reasoning"` field is deliberately hardcoded; it tags the LLM call as an agent reasoning step (vs. e.g. a synthesis or grading step). Adding other phase values requires Dynamo-side coordination. -- The `agent_trace.v1` schema is owned upstream by Dynamo (`dynamo/lib/llm/src/agents/trace/`). Don't change record shapes here without an upstream PR landing first. +- The `request.trace.v1` schema is owned upstream by Dynamo (`dynamo/lib/llm/src/request_trace/`). Don't change record shapes here without an upstream PR landing first. - `package-lock.json` churn from npm version differences should be reverted before committing (`git checkout -- package-lock.json` if a no-op edit appears). diff --git a/README.md b/README.md index 1175412..b780534 100644 --- a/README.md +++ b/README.md @@ -6,16 +6,16 @@ A Pi extension that registers a `dynamo` provider backed by [Dynamo](https://git pi --model dynamo/ ``` -With one switch (`DYN_AGENT_TRACE=1`) it also tags every request for Dynamo's agent trace, gives each pi-subagent its own isolated KV session, and can relay Pi tool events into the trace — all without patching `pi-mono`. +With one switch (`DYN_REQUEST_TRACE=1`) it also tags every request for Dynamo's request trace, gives each pi-subagent its own trajectory id, and can relay Pi tool events into the trace — all without patching `pi-mono`. ## What it does - **Model provider** — registers `dynamo`, discovers models from `/v1/models` (falls back to `dynamo/default`), and streams via Pi's OpenAI-compatible path. - **Agent context** — injects `nvext.agent_context` (session/trajectory identity) so Dynamo can attribute each LLM request in its trace. -- **Subagent KV isolation** — gives each [pi-subagents](https://github.com/nicobailon/pi-subagents) child its own Dynamo streaming session: opened on its first turn, pinned across turns, and freed deterministically when the subagent finishes. See [Subagent KV isolation](#subagent-kv-isolation). +- **Trajectory-native KV release** — gives each [pi-subagents](https://github.com/nicobailon/pi-subagents) child its own `trajectory_id`; Dynamo/SGLang tag requests by that id and release it when the trajectory finishes. See [Trajectory-native KV release](#trajectory-native-kv-release). - **Tool-event relay** — optionally pushes Pi `tool_start` / `tool_end` / `tool_error` events to Dynamo over ZMQ so one trace shows LLM spans and tool spans together. -Everything but the bare model provider is gated by the `DYN_AGENT_TRACE` master switch and is off by default. +Everything but the bare model provider is gated by the `DYN_REQUEST_TRACE` master switch and is off by default. ## Install @@ -37,59 +37,59 @@ Point Pi at a running Dynamo endpoint: ```bash export DYNAMO_BASE_URL=http://127.0.0.1:8000/v1 export DYNAMO_API_KEY=dummy # local Dynamo usually ignores this; defaults to dynamo-local -export DYN_AGENT_TRACE=1 # opt into agent_context + subagent KV isolation +export DYN_REQUEST_TRACE=1 # opt into agent_context + trajectory finality pi --model dynamo/ -p "Reply exactly ok." ``` -That's the whole required setup. Everything else (`session_type_id`, `trajectory_id`, `session_id`, timeouts) has a sensible default and is only set when you want to override it — see [Configuration](#configuration). +That's the whole required setup. Everything else (`session_type_id`, `trajectory_id`, `session_id`) has a sensible default and is only set when you want to override it — see [Configuration](#configuration). -## Subagent KV isolation +## Trajectory-native KV release -Agentic runs spawn short-lived subagents that accumulate KV cache, use it for a few turns, then exit. Left in the shared radix tree, that ephemeral KV competes with the lead agent's long-lived prefix for eviction. Dynamo's streaming sessions hold a subagent's KV in a dedicated slot — invisible to eviction, freed on close. +Agentic runs spawn short-lived subagents that accumulate KV cache, use it for a few turns, then exit. Left in the shared radix tree, that ephemeral KV competes with the lead agent's long-lived prefix for eviction. Dynamo's session radix cache tags each request by `agent_context.trajectory_id` and bulk-releases that trajectory on `trajectory_final=true`. -When `DYN_AGENT_TRACE=1` and this process is a pi-subagents child, the provider drives that lifecycle automatically via `nvext.session_control`: +When `DYN_REQUEST_TRACE=1`, the provider drives that lifecycle through `nvext.agent_context`: ```mermaid sequenceDiagram - participant Child as Subagent (child pi process) + participant Root as Root pi process + participant Child as Subagent pi process participant Dynamo - Note over Child: session_id = runId:childAgent:childIndex - Child->>Dynamo: turn 1 action "open" (worker holds KV in a session slot) - Child->>Dynamo: turn 2+ session_id only (sticky: O(1) KV restore) - Note over Child: agent_end -> close request frees the KV deterministically + Root->>Dynamo: normal turn: trajectory_id = T_root + Child->>Dynamo: normal turn: trajectory_id = T_child
parent_trajectory_id = T_root + Child->>Dynamo: agent_end: trajectory_id = T_child
trajectory_final = true + Root->>Dynamo: quit: trajectory_id = T_root
trajectory_final = true ``` -- The session id is the subagent's own identity (`PI_SUBAGENT_RUN_ID:PI_SUBAGENT_CHILD_AGENT:PI_SUBAGENT_CHILD_INDEX`), so it needs no extra operator setup. -- The **lead agent is never pinned** — only subagents get a session, so primary requests stay load-balanced. -- Close fires on `agent_end` (with `session_shutdown` as a backstop). If neither lands, Dynamo's idle timeout reaps the session; tune it with `DYN_AGENT_SESSION_TIMEOUT`. +- The child `trajectory_id` is the subagent's own identity (`PI_SUBAGENT_RUN_ID:PI_SUBAGENT_CHILD_AGENT:PI_SUBAGENT_CHILD_INDEX`), so it needs no extra operator setup. +- `parent_trajectory_id` is lineage only: it is present in subagents and absent in the root. +- Subagent finality fires on `agent_end` (with `session_shutdown` as a backstop). Root finality fires only on `session_shutdown` reason `quit`. -Requires a Dynamo frontend in `--router-mode kv` and an SGLang worker launched with `--enable-streaming-session` (SGLang ≥ 0.5.11). Against any other backend the `session_control` hint is ignored, so it is always safe to leave on. +Requires a Dynamo frontend in `--router-mode kv` and an SGLang worker launched with `--enable-session-radix-cache`. Against any other backend the `agent_context` metadata remains trace-only. -> The provider also links parent/child **trajectory ids** for tracing when `DYN_AGENT_TRAJECTORY_ID` is set on the root. This is independent of KV isolation — see [Trajectory linking](#trajectory-linking). +> The provider also links parent/child **trajectory ids** for tracing when `DYN_AGENT_TRAJECTORY_ID` is set on the root. See [Trajectory linking](#trajectory-linking). ## Configuration -The only thing you must set is the connection (`DYNAMO_BASE_URL`) and, to enable the agentic features, `DYN_AGENT_TRACE`. Everything below is an optional override. +The only thing you must set is the connection (`DYNAMO_BASE_URL`) and, to enable the agentic features, `DYN_REQUEST_TRACE`. Everything below is an optional override. | Variable | Default | Purpose | | --- | --- | --- | | `DYNAMO_BASE_URL` | `http://127.0.0.1:8000/v1` | Dynamo endpoint root (falls back to `OPENAI_BASE_URL`). | | `DYNAMO_API_KEY` | `dynamo-local` | Bearer token. | -| `DYN_AGENT_TRACE` | off | **Master switch.** When truthy (`1`/`true`/`yes`/`on`), enables `agent_context`, subagent session_control, and the tool relay. | +| `DYN_REQUEST_TRACE` | off | **Master switch.** When truthy (`1`/`true`/`yes`/`on`), enables `agent_context`, trajectory finality, and the tool relay. | | `DYN_AGENT_SESSION_TYPE_ID` | `pi_coding_agent` | Session class in the trace. | | `DYN_AGENT_SESSION_ID` | Pi session id | Top-level run id. | | `DYN_AGENT_TRAJECTORY_ID` | Pi session id | Trajectory id; also enables parent/child [trajectory linking](#trajectory-linking) for subagents. | | `DYN_AGENT_PARENT_TRAJECTORY_ID` | unset | Parent trajectory; set manually to override the bridge. | -| `DYN_AGENT_SESSION_TIMEOUT` | Dynamo default (300s) | Idle timeout (seconds) sent on a subagent session open. | -| `DYN_AGENT_TOOL_EVENTS_ZMQ_ENDPOINT` | unset | Dynamo-bound ZMQ PULL endpoint for the tool relay (aliases: `DYN_AGENT_TRACE_TOOL_ZMQ_ENDPOINT`, `DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT`). | +| `DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT` | unset | Dynamo-bound ZMQ PULL endpoint for the tool relay. | -`PI_SUBAGENT_CHILD` / `PI_SUBAGENT_RUN_ID` / `PI_SUBAGENT_CHILD_AGENT` / `PI_SUBAGENT_CHILD_INDEX` are **read, never set** — pi-subagents populates them and the provider uses them to derive the subagent session id and trajectory link. +`PI_SUBAGENT_CHILD` / `PI_SUBAGENT_RUN_ID` / `PI_SUBAGENT_CHILD_AGENT` / `PI_SUBAGENT_CHILD_INDEX` are **read, never set** — pi-subagents populates them and the provider uses them to derive the child `trajectory_id` and parent link.
Injected request metadata -With `DYN_AGENT_TRACE` on, each request payload gets: +With `DYN_REQUEST_TRACE` on, each request payload gets: ```json { @@ -99,13 +99,12 @@ With `DYN_AGENT_TRACE` on, each request payload gets: "session_id": "", "trajectory_id": "", "phase": "reasoning" - }, - "session_control": { "session_id": "run-1:researcher:0", "action": "open" } + } } } ``` -`session_control` appears only for pi-subagents children. Existing `nvext` fields are preserved, and `x-request-id` is added when absent. +Existing `nvext` fields are preserved, and `x-request-id` is added when absent. Subagent requests include `parent_trajectory_id`; final requests also include `trajectory_final: true`.
@@ -114,15 +113,15 @@ With `DYN_AGENT_TRACE` on, each request payload gets: When a tool-event endpoint is set, Pi connects a ZMQ PUSH socket and sends one multipart message per event: ```text -[topic, seq_be_u64, msgpack(AgentTraceRecord)] +[topic, seq_be_u64, msgpack(RequestTraceRecord)] ``` -The record uses Dynamo's `dynamo.agent.trace.v1` schema (`event_type`, `agent_context`, and a `tool` object with timing/status). Dynamo owns the PULL bind side, so multiple Pi processes and subagents can all connect as producers. Terminal `tool_end` / `tool_error` records are self-contained. +The record uses Dynamo's `dynamo.request.trace.v1` schema (`event_type`, `event_source`, `agent_context`, and a `tool` object with timing/status). Dynamo owns the PULL bind side, so multiple Pi processes and subagents can all connect as producers. Terminal `tool_end` / `tool_error` records are self-contained.
## Trajectory linking -For tracing (not KV isolation), the provider keeps parent and child trajectory ids distinct. When a pi-subagents child inherits the parent's `DYN_AGENT_TRAJECTORY_ID`, the provider reinterprets it as the child's `parent_trajectory_id` and synthesizes a fresh child `trajectory_id` (`runId:childAgent:childIndex`), mutating `process.env` so nested chains stay attributable. Setting `DYN_AGENT_PARENT_TRAJECTORY_ID` manually disables this. If you don't set `DYN_AGENT_TRAJECTORY_ID` at all, every agent simply uses its own Pi session id and the trace still works — only the explicit parent→child link is absent. +The provider keeps parent and child trajectory ids distinct. When a pi-subagents child inherits the parent's `DYN_AGENT_TRAJECTORY_ID`, the provider reinterprets it as the child's `parent_trajectory_id` and synthesizes a fresh child `trajectory_id` (`runId:childAgent:childIndex`), mutating `process.env` so nested chains stay attributable. Setting `DYN_AGENT_PARENT_TRAJECTORY_ID` manually overrides the parent link. If you don't set `DYN_AGENT_TRAJECTORY_ID` at all, every subagent still gets its own child trajectory id — only the explicit parent→child link is absent. ## Local Dynamo @@ -133,7 +132,7 @@ Two helper scripts onboard a local Dynamo for testing: ./scripts/launch-agg-agent.sh # serve GLM-4.7-Flash: one frontend + one SGLang worker ``` -`launch-agg-agent.sh` uses file discovery + TCP + ZMQ (no NATS/etcd), enables streaming sessions and JSONL tracing, and prints the exact Pi env to use. Common overrides: +`launch-agg-agent.sh` uses file discovery + TCP + ZMQ (no NATS/etcd), enables session radix cache and JSONL tracing, and prints the exact Pi env to use. Common overrides: ```bash ./scripts/launch-agg-agent.sh --gpu 1 # different single GPU @@ -141,7 +140,7 @@ Two helper scripts onboard a local Dynamo for testing: ./scripts/launch-agg-agent.sh -- --disable-cuda-graph # forward flags to dynamo.sglang ``` -> Subagent KV isolation additionally needs `--router-mode kv` on the frontend (which requires a NATS event plane). The default launcher is the no-NATS tracing setup; switch the event plane to `nats` and add `--router-mode kv` to exercise session_control end to end. +> Trajectory-native release additionally needs `--router-mode kv` on the frontend so Dynamo can route the internal close to the worker that owns the tag. ## Development @@ -158,10 +157,10 @@ npm run build # -> dist/ - **`/v1/models` empty** — wait for the backend to load; confirm frontend and worker share the same discovery/request/event planes and `DYN_FILE_KV`. - **Model unknown** — `curl "$DYNAMO_BASE_URL/models"` and use the returned id as `dynamo/`; restart Pi if discovery failed before Dynamo was ready. -- **No agent_context / 400 on requests** — make sure `DYN_AGENT_TRACE` is set; the provider injects nothing without it. +- **No agent_context / 400 on requests** — make sure `DYN_REQUEST_TRACE` is set; the provider injects nothing without it. - **Tool spans missing** — set a tool-event endpoint on both sides and confirm the run actually used tools. -- **No subagent sessions** — needs `DYN_AGENT_TRACE=1`, a pi-subagents child (`PI_SUBAGENT_*` populated), `--router-mode kv`, and a worker with `--enable-streaming-session`. +- **No trajectory release** — needs `DYN_REQUEST_TRACE=1`, `--router-mode kv`, and a worker with `--enable-session-radix-cache`. ## Scope -No `pi-mono` core changes, no native Rust ABI, no Dynamo launch management beyond the helper scripts. The `nvext` and `agent_trace.v1` schemas are owned upstream by Dynamo. +No `pi-mono` core changes, no native Rust ABI, no Dynamo launch management beyond the helper scripts. The `nvext` and `request.trace.v1` schemas are owned upstream by Dynamo. diff --git a/scripts/install-dynamo.sh b/scripts/install-dynamo.sh index 70c94cf..33b3d44 100755 --- a/scripts/install-dynamo.sh +++ b/scripts/install-dynamo.sh @@ -20,7 +20,7 @@ usage() { cat <<'EOF' Usage: scripts/install-dynamo.sh [OPTIONS] -Clone Dynamo, check out the agent trace/replay branch, create a uv venv, build +Clone Dynamo, check out the request trace/replay branch, create a uv venv, build the Python bindings, and install Dynamo into the venv. Options: diff --git a/scripts/integration-smoke.sh b/scripts/integration-smoke.sh index 83e9183..8e615bf 100755 --- a/scripts/integration-smoke.sh +++ b/scripts/integration-smoke.sh @@ -26,7 +26,7 @@ readonly REPO_ROOT=$(cd "${SCRIPT_DIR}/.." && pwd) : "${TRACE_DIR:=$(mktemp -d -t pi-dynamo-smoke-XXXXXX)}" : "${DYNAMO_TIMEOUT_SECS:=120}" -readonly TRACE_PATH="${TRACE_DIR}/dynamo-agent-trace.jsonl" +readonly TRACE_PATH="${TRACE_DIR}/dynamo-request-trace.jsonl" readonly FRONTEND_LOG="${TRACE_DIR}/frontend.log" readonly MOCKER_LOG="${TRACE_DIR}/mocker.log" @@ -74,10 +74,10 @@ echo "smoke: building pi-dynamo-provider" # Trace sink config. flush interval kept short so the smoke test doesn't race # the writer when reading the JSONL between requests. -export DYN_AGENT_TRACE=1 -export DYN_AGENT_TRACE_SINKS=jsonl -export DYN_AGENT_TRACE_OUTPUT_PATH="${TRACE_PATH}" -export DYN_AGENT_TRACE_JSONL_FLUSH_INTERVAL_MS=100 +export DYN_REQUEST_TRACE=1 +export DYN_REQUEST_TRACE_SINKS=jsonl +export DYN_REQUEST_TRACE_OUTPUT_PATH="${TRACE_PATH}" +export DYN_REQUEST_TRACE_JSONL_FLUSH_INTERVAL_MS=100 # Local transport plane — no NATS, no etcd. file-backed discovery + tcp + zmq. export DYN_DISCOVERY_BACKEND=file @@ -121,7 +121,7 @@ fi export DYNAMO_BASE_URL="http://127.0.0.1:${DYNAMO_FRONTEND_PORT}/v1" export DYNAMO_TEST_MODEL_ID -export DYN_AGENT_TRACE_OUTPUT_PATH +export DYN_REQUEST_TRACE_OUTPUT_PATH echo "smoke: running assertions" node "${REPO_ROOT}/test/integration/smoke.mjs" diff --git a/scripts/launch-agg-agent.sh b/scripts/launch-agg-agent.sh index 7745957..a48160c 100755 --- a/scripts/launch-agg-agent.sh +++ b/scripts/launch-agg-agent.sh @@ -45,7 +45,7 @@ Environment overrides: TP DYN_HTTP_PORT DYN_SYSTEM_PORT - DYN_AGENT_TRACE_OUTPUT_PATH + DYN_REQUEST_TRACE_OUTPUT_PATH Examples: scripts/launch-agg-agent.sh @@ -141,7 +141,7 @@ DYNAMO_DIR="${DYNAMO_DIR:-$WORKDIR/dynamo}" RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)" RUN_DIR="$WORKDIR/runs/$RUN_ID" LOG_DIR="$RUN_DIR/logs" -TRACE_PATH="${DYN_AGENT_TRACE_OUTPUT_PATH:-$RUN_DIR/dynamo-agent-trace.jsonl}" +TRACE_PATH="${DYN_REQUEST_TRACE_OUTPUT_PATH:-$RUN_DIR/dynamo-request-trace.jsonl}" FILE_KV="${DYN_FILE_KV:-$RUN_DIR/file-kv}" CHILD_PIDS=() @@ -164,7 +164,7 @@ Pi environment for another shell: export DYNAMO_API_KEY=dummy export DYN_AGENT_SESSION_TYPE_ID=pi_coding_agent export DYN_AGENT_SESSION_ID=pi-demo-${RUN_ID} - export DYN_AGENT_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 + export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 Example Pi command: @@ -182,7 +182,7 @@ Perfetto conversion: ${TRACE_PATH} \\ --include-markers \\ --separate-stage-tracks \\ - --output ${RUN_DIR}/dynamo-agent-trace.perfetto.json + --output ${RUN_DIR}/dynamo-request-trace.perfetto.json EOF } @@ -219,10 +219,10 @@ export DYN_DISCOVERY_BACKEND=file export DYN_REQUEST_PLANE=tcp export DYN_EVENT_PLANE=zmq export DYN_FILE_KV="$FILE_KV" -export DYN_AGENT_TRACE=1 -export DYN_AGENT_TRACE_SINKS="${DYN_AGENT_TRACE_SINKS:-jsonl}" -export DYN_AGENT_TRACE_OUTPUT_PATH="$TRACE_PATH" -export DYN_AGENT_TRACE_JSONL_FLUSH_INTERVAL_MS="${DYN_AGENT_TRACE_JSONL_FLUSH_INTERVAL_MS:-100}" +export DYN_REQUEST_TRACE=1 +export DYN_REQUEST_TRACE_SINKS="${DYN_REQUEST_TRACE_SINKS:-jsonl}" +export DYN_REQUEST_TRACE_OUTPUT_PATH="$TRACE_PATH" +export DYN_REQUEST_TRACE_JSONL_FLUSH_INTERVAL_MS="${DYN_REQUEST_TRACE_JSONL_FLUSH_INTERVAL_MS:-100}" export DYN_LOG="${DYN_LOG:-info}" log "Run directory: $RUN_DIR" @@ -241,10 +241,12 @@ python3 -m dynamo.frontend \ --discovery-backend file \ --request-plane tcp \ --event-plane zmq \ - --router-mode round-robin \ + --router-mode kv \ + --kv-cache-block-size 16 \ >"$LOG_DIR/frontend.log" 2>&1 & CHILD_PIDS+=("$!") +KV_EVENTS_CONFIG='{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557","enable_kv_cache_events":true}' log "Starting Dynamo SGLang worker" DYN_SYSTEM_PORT="$SYSTEM_PORT" \ python3 -m dynamo.sglang \ @@ -256,8 +258,8 @@ python3 -m dynamo.sglang \ --page-size 16 \ --tp "$TP" \ --trust-remote-code \ - --enable-streaming-session \ - --skip-tokenizer-init \ + --enable-session-radix-cache \ + --kv-events-config "$KV_EVENTS_CONFIG" \ --dyn-reasoning-parser glm45 \ --dyn-tool-call-parser glm47 \ --enable-metrics \ diff --git a/src/dynamo-provider.ts b/src/dynamo-provider.ts index 3565562..f72dddd 100644 --- a/src/dynamo-provider.ts +++ b/src/dynamo-provider.ts @@ -24,19 +24,16 @@ export interface DynamoEnvironment { DYNAMO_BASE_URL?: string; OPENAI_BASE_URL?: string; DYNAMO_API_KEY?: string; - // Master switch for the provider's agentic emissions. When truthy the - // provider injects nvext.agent_context, drives subagent session_control, and - // (if an endpoint is set) the tool-event relay — all with sensible defaults - // that the more specific DYN_AGENT_* / DYNAMO_* vars below override. When - // unset/falsy the provider is just a plain `dynamo/` provider. - DYN_AGENT_TRACE?: string; + // Master switch for the provider's request-trace emissions. When truthy the + // provider injects nvext.agent_context and (if an endpoint is set) the + // tool-event relay — all with sensible defaults that the more specific + // DYN_AGENT_* / DYNAMO_* vars below override. When unset/falsy the provider + // is just a plain `dynamo/` provider. + DYN_REQUEST_TRACE?: string; DYN_AGENT_SESSION_TYPE_ID?: string; DYN_AGENT_SESSION_ID?: string; DYN_AGENT_TRAJECTORY_ID?: string; DYN_AGENT_PARENT_TRAJECTORY_ID?: string; - // Per-session inactivity timeout (seconds) sent on the streaming-session - // open. Omitted when unset so Dynamo applies its own 300s default. - DYN_AGENT_SESSION_TIMEOUT?: string; // pi-subagents bookkeeping. pi-subagents spawns each child agent as a // node child_process with `{ ...process.env, ...subagentEnv }`, so the // parent's DYN_AGENT_TRAJECTORY_ID arrives in the child unchanged — @@ -51,29 +48,14 @@ export interface DynamoEnvironment { export interface DynamoProviderRuntimeConfig { baseUrl: string; apiKey: string; - // DYN_AGENT_TRACE master switch. Gates agent_context, session_control, and + // DYN_REQUEST_TRACE master switch. Gates agent_context and // the tool relay; the model provider itself is registered regardless. traceEnabled: boolean; sessionTypeId: string; sessionId?: string; trajectoryId?: string; parentTrajectoryId?: string; - // Streaming-session id for subagent KV isolation. Set whenever this process - // is a pi-subagents child (derived from PI_SUBAGENT_* alone — independent of - // the trajectory bridge). One child process == one Dynamo streaming session. - sessionControlId?: string; - // Inactivity timeout (seconds) for that session, when DYN_AGENT_SESSION_TIMEOUT - // is set. Omitted otherwise so Dynamo's 300s default applies. - sessionTimeoutSecs?: number; -} - -// nvext.session_control: sticky routing + SGLang streaming-session KV isolation -// for subagents. Sibling of agent_context on the request payload. action is -// omitted on intermediate turns; timeout only matters on the open. -export interface DynamoSessionControl { - session_id: string; - action?: "open" | "close"; - timeout?: number; + isSubagent?: boolean; } export interface DynamoAgentContext { @@ -127,16 +109,13 @@ function isTruthyEnv(value: string | undefined): boolean { } /** - * The subagent's stable, unique streaming-session id, derived purely from the - * pi-subagents `PI_SUBAGENT_*` bookkeeping. This is the natural key for KV - * isolation and is deliberately independent of `DYN_AGENT_TRAJECTORY_ID` / - * tracing — a subagent gets its own session whether or not anyone set up a - * trajectory lineage. Returns `undefined` outside a pi-subagents child or when - * the identity is incomplete (no run id / agent name). + * The subagent's stable trajectory id, derived purely from the pi-subagents + * `PI_SUBAGENT_*` bookkeeping. Returns `undefined` outside a pi-subagents child + * or when the identity is incomplete (no run id / agent name). * * `PI_SUBAGENT_CHILD_INDEX` defaults to `"0"` when absent. */ -export function computeSubagentSessionId(env: DynamoEnvironment): string | undefined { +export function computeSubagentTrajectoryId(env: DynamoEnvironment): string | undefined { if (getEnvValue(env, "PI_SUBAGENT_CHILD") !== "1") return undefined; const runId = getEnvValue(env, "PI_SUBAGENT_RUN_ID"); const childAgent = getEnvValue(env, "PI_SUBAGENT_CHILD_AGENT"); @@ -148,27 +127,25 @@ export function computeSubagentSessionId(env: DynamoEnvironment): string | undef /** * Compute the trajectory rewrite that pi-subagents inheritance implies, without * mutating any caller-visible state. Pure: takes the raw env, returns either - * `null` (no rewrite applies) or `{ trajectoryId, parentTrajectoryId }` where - * `trajectoryId` is the synthesized child id and `parentTrajectoryId` is the - * inherited parent id. + * `null` (not a pi-subagents child) or the child's trajectory id plus optional + * parent id. * - * This is the TRACING bridge — distinct from `computeSubagentSessionId`. It - * fires only when ALL of these hold: - * - `PI_SUBAGENT_CHILD === "1"` (this process was spawned by pi-subagents) - * - inherited `DYN_AGENT_TRAJECTORY_ID` is set (the parent's id we want to - * reinterpret as this child's parent) - * - `DYN_AGENT_PARENT_TRAJECTORY_ID` is NOT already set (manual override wins) - * - the subagent identity is complete (`computeSubagentSessionId` resolves) + * `PI_SUBAGENT_CHILD === "1"` switches the current process identity from root + * to child: `trajectory_id` becomes the child id, and the inherited + * `DYN_AGENT_TRAJECTORY_ID` becomes `parent_trajectory_id` unless an explicit + * parent override is already present. */ export function computeSubagentTrajectoryRewrite( env: DynamoEnvironment, -): { trajectoryId: string; parentTrajectoryId: string } | null { - if (getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID")) return null; - const inherited = getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID"); - if (!inherited) return null; - const trajectoryId = computeSubagentSessionId(env); +): { trajectoryId: string; parentTrajectoryId?: string } | null { + const trajectoryId = computeSubagentTrajectoryId(env); if (!trajectoryId) return null; - return { parentTrajectoryId: inherited, trajectoryId }; + const parentTrajectoryId = + getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID") ?? getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID"); + return { + trajectoryId, + ...(parentTrajectoryId ? { parentTrajectoryId } : {}), + }; } /** @@ -178,9 +155,8 @@ export function computeSubagentTrajectoryRewrite( * generation would observe the original grandparent as its parent and the * middle generations would be invisible in the dynamo trace. * - * Idempotent: a second call has no effect because the rewrite condition - * requires `DYN_AGENT_PARENT_TRAJECTORY_ID` to be absent, which it isn't after - * the first call. Safe to invoke from extension init. + * Idempotent: a second call has no effect once the env already contains the + * computed child trajectory and parent link. Safe to invoke from extension init. * * Mutates the supplied env object in place (defaults to `process.env`); also * returns whether a rewrite was applied so callers can log/test. @@ -188,7 +164,13 @@ export function computeSubagentTrajectoryRewrite( export function applySubagentBridge(env: NodeJS.ProcessEnv = process.env): boolean { const rewrite = computeSubagentTrajectoryRewrite(env); if (!rewrite) return false; - env.DYN_AGENT_PARENT_TRAJECTORY_ID = rewrite.parentTrajectoryId; + if ( + getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID") === rewrite.trajectoryId && + getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID") === rewrite.parentTrajectoryId + ) { + return false; + } + if (rewrite.parentTrajectoryId) env.DYN_AGENT_PARENT_TRAJECTORY_ID = rewrite.parentTrajectoryId; env.DYN_AGENT_TRAJECTORY_ID = rewrite.trajectoryId; return true; } @@ -201,48 +183,36 @@ export function applySubagentBridge(env: NodeJS.ProcessEnv = process.env): boole * (no `parent_trajectory_id`). Only the ROOT seeds — a pi-subagents child already * inherits its parent's id, and a caller-set id wins. Uses `DYN_AGENT_SESSION_ID` * when present (root trajectory == its session) else a fresh id. Gated on - * `DYN_AGENT_TRACE`. Mutates env in place; must run before any subagent spawn. + * `DYN_REQUEST_TRACE`. Mutates env in place; must run before any subagent spawn. * Returns whether a seed was written. */ export function seedRootTrajectory( env: NodeJS.ProcessEnv = process.env, mkId: () => string = randomUUID, ): boolean { - if (!isTruthyEnv(getEnvValue(env, "DYN_AGENT_TRACE"))) return false; + if (!isTruthyEnv(getEnvValue(env, "DYN_REQUEST_TRACE"))) return false; if (getEnvValue(env, "PI_SUBAGENT_CHILD") === "1") return false; if (getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID")) return false; env.DYN_AGENT_TRAJECTORY_ID = getEnvValue(env, "DYN_AGENT_SESSION_ID") ?? mkId(); return true; } -function parsePositiveIntOrUndefined(value: string | undefined): number | undefined { - if (value === undefined) return undefined; - const parsed = Number.parseInt(value, 10); - return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined; -} - export function readDynamoConfig(env: DynamoEnvironment = process.env): DynamoProviderRuntimeConfig { const rewrite = computeSubagentTrajectoryRewrite(env); const sessionId = getEnvValue(env, "DYN_AGENT_SESSION_ID"); const trajectoryId = rewrite?.trajectoryId ?? getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID"); const parentTrajectoryId = rewrite?.parentTrajectoryId ?? getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID"); - // Only a pi-subagents child gets a streaming session, keyed on the subagent's - // own identity — independent of the trajectory bridge, so the lead agent - // stays load-balanced while each subagent pins regardless of DYN_AGENT_*. - const sessionControlId = computeSubagentSessionId(env); - const sessionTimeoutSecs = parsePositiveIntOrUndefined(getEnvValue(env, "DYN_AGENT_SESSION_TIMEOUT")); return { baseUrl: normalizeDynamoBaseUrl(getEnvValue(env, "DYNAMO_BASE_URL") ?? getEnvValue(env, "OPENAI_BASE_URL")), apiKey: getEnvValue(env, "DYNAMO_API_KEY") ?? DEFAULT_DYNAMO_API_KEY, - traceEnabled: isTruthyEnv(getEnvValue(env, "DYN_AGENT_TRACE")), + traceEnabled: isTruthyEnv(getEnvValue(env, "DYN_REQUEST_TRACE")), sessionTypeId: getEnvValue(env, "DYN_AGENT_SESSION_TYPE_ID") ?? DEFAULT_SESSION_TYPE_ID, ...(sessionId ? { sessionId } : {}), ...(trajectoryId ? { trajectoryId } : {}), ...(parentTrajectoryId ? { parentTrajectoryId } : {}), - ...(sessionControlId ? { sessionControlId } : {}), - ...(sessionTimeoutSecs !== undefined ? { sessionTimeoutSecs } : {}), + isSubagent: rewrite !== null, }; } @@ -252,7 +222,7 @@ export function buildDynamoAgentContext( ): DynamoAgentContext { // session_id and trajectory_id both default to Pi's own session id when not // pinned via DYN_AGENT_*. Dynamo's AgentContext requires session_id, so a - // default keeps the payload valid with zero operator env beyond DYN_AGENT_TRACE. + // default keeps the payload valid with zero operator env beyond DYN_REQUEST_TRACE. const trajectoryId = config.trajectoryId ?? options?.sessionId; const sessionId = config.sessionId ?? options?.sessionId; return { @@ -285,23 +255,6 @@ export function mergeDynamoAgentContext(payload: unknown, agentContext: DynamoAg }; } -export function mergeDynamoSessionControl(payload: unknown, sessionControl: DynamoSessionControl): unknown { - const payloadRecord = isRecord(payload) ? payload : {}; - const existingNvext = isRecord(payloadRecord.nvext) ? payloadRecord.nvext : {}; - const existingSessionControl = isRecord(existingNvext.session_control) ? existingNvext.session_control : {}; - - return { - ...payloadRecord, - nvext: { - ...existingNvext, - session_control: { - ...sessionControl, - ...existingSessionControl, - }, - }, - }; -} - function hasHeader(headers: Record, target: string): boolean { const normalizedTarget = target.toLowerCase(); return Object.keys(headers).some((key) => key.toLowerCase() === normalizedTarget); @@ -385,86 +338,34 @@ function toOpenAICompletionsModel(model: Model): Model<"openai-completions" type FetchLike = (input: string, init: RequestInit) => Promise<{ ok: boolean; status: number }>; -/** - * One session for one pi-subagents child process. The radix-native backend has - * no open step (the tag is the only session state), so the lifecycle is: - * - every turn -> bare session_id: tags that turn's KV as ordinary, evictable - * radix (LRU-neutral — no pinned slot), and keys sticky routing to one worker. - * - on agent_end -> a throwaway max_tokens=1 request carries action "close", - * bulk-freeing this session's tagged KV instead of waiting for LRU. - * Dynamo has no standalone close RPC — close must ride a routed request. If close - * never lands (e.g. SIGKILL), the KV is ordinary radix and LRU reclaims it. - */ -export class DynamoSubagentSession { - readonly sessionId: string; - modelId = ""; - private readonly baseUrl: string; - private readonly apiKey: string; - private readonly timeoutSecs: number | undefined; - private readonly createRequestId: () => string; - private opened = false; - - constructor( - config: Pick & { - sessionControlId: string; - sessionTimeoutSecs?: number; - }, - createRequestId: () => string = randomUUID, - ) { - this.sessionId = config.sessionControlId; - this.baseUrl = config.baseUrl; - this.apiKey = config.apiKey; - this.timeoutSecs = config.sessionTimeoutSecs; - this.createRequestId = createRequestId; - } - - /** Build session_control for the current turn. No open step under radix-native: - * the bare session_id tags this turn's KV; `opened` only gates the close. */ - controlForTurn(): DynamoSessionControl { - this.opened = true; - return { - session_id: this.sessionId, - ...(this.timeoutSecs !== undefined ? { timeout: this.timeoutSecs } : {}), - }; - } - - /** - * Fire-and-forget close. Best-effort: skipped if no turn opened the session, - * and any transport error is swallowed (KV cleanup is best-effort — never - * block Pi's shutdown on it). The 5s timeout bounds a hung frontend; the idle - * reaper covers the case where this never lands. - * - * Re-armable: clearing `opened` synchronously both guards a double-fire - * (agent_end then session_shutdown) and lets a later turn re-tag a fresh - * session. agent_end fires once per prompt, so a multi-prompt subagent frees - * its KV between prompts and re-warms on the next one's first tagged turn. - */ - async close(fetchImpl: FetchLike = fetch): Promise { - if (!this.opened) return false; - this.opened = false; - const sessionControl: DynamoSessionControl = { session_id: this.sessionId, action: "close" }; - const body = { - model: this.modelId, - messages: [{ role: "user", content: "." }], - max_tokens: 1, - stream: false, - nvext: { session_control: sessionControl }, - }; - try { - const response = await fetchImpl(`${this.baseUrl}/chat/completions`, { - method: "POST", - headers: { - "content-type": "application/json", - authorization: `Bearer ${this.apiKey}`, - "x-request-id": this.createRequestId(), - }, - body: JSON.stringify(body), - signal: AbortSignal.timeout(5000), - }); - return response.ok; - } catch { - return false; - } +export async function sendTrajectoryFinal( + config: DynamoProviderRuntimeConfig, + modelId: string, + createRequestId: () => string = randomUUID, + fetchImpl: FetchLike = fetch, +): Promise { + const agentContext = { ...buildDynamoAgentContext(config), trajectory_final: true }; + if (!agentContext.trajectory_id) return false; + try { + const response = await fetchImpl(`${config.baseUrl}/chat/completions`, { + method: "POST", + headers: { + "content-type": "application/json", + authorization: `Bearer ${config.apiKey}`, + "x-request-id": createRequestId(), + }, + body: JSON.stringify({ + model: modelId, + messages: [{ role: "user", content: "." }], + max_tokens: 1, + stream: false, + nvext: { agent_context: agentContext }, + }), + signal: AbortSignal.timeout(5000), + }); + return response.ok; + } catch { + return false; } } @@ -472,7 +373,6 @@ export function createDynamoStreamSimple( config: DynamoProviderRuntimeConfig, delegate: OpenAICompletionsStreamSimple = streamSimpleOpenAICompletions, createRequestId: () => string = randomUUID, - session?: DynamoSubagentSession, ): ProviderStreamSimple { return (model: Model, context: Context, options?: SimpleStreamOptions): AssistantMessageEventStream => { const openAIModel = toOpenAICompletionsModel(model); @@ -483,7 +383,7 @@ export function createDynamoStreamSimple( headers, }; - // DYN_AGENT_TRACE off: behave as a plain dynamo/ provider — still + // DYN_REQUEST_TRACE off: behave as a plain dynamo/ provider — still // add x-request-id for correlation, but inject no agentic nvext. if (!config.traceEnabled) { return delegate(openAIModel, context, baseOptions); @@ -491,16 +391,11 @@ export function createDynamoStreamSimple( const agentContext = buildDynamoAgentContext(config, options); const previousOnPayload = options?.onPayload; - // Capture the live model id so the deferred close request targets a real - // model, and advance the open->sticky transition once per turn. - if (session) session.modelId = model.id; - const sessionControl = session?.controlForTurn(); return delegate(openAIModel, context, { ...baseOptions, onPayload: async (payload) => { - let injectedPayload = mergeDynamoAgentContext(payload, agentContext); - if (sessionControl) injectedPayload = mergeDynamoSessionControl(injectedPayload, sessionControl); + const injectedPayload = mergeDynamoAgentContext(payload, agentContext); return (await previousOnPayload?.(injectedPayload, model)) ?? injectedPayload; }, }); @@ -510,7 +405,6 @@ export function createDynamoStreamSimple( export function createDynamoProviderConfig( config: DynamoProviderRuntimeConfig, models: ProviderModelConfig[], - session?: DynamoSubagentSession, ): ProviderConfig { return { name: "Dynamo", @@ -518,6 +412,6 @@ export function createDynamoProviderConfig( apiKey: config.apiKey, api: DYNAMO_API, models, - streamSimple: createDynamoStreamSimple(config, streamSimpleOpenAICompletions, randomUUID, session), + streamSimple: createDynamoStreamSimple(config), }; } diff --git a/src/index.ts b/src/index.ts index 279dc70..e8dcec5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,15 +5,14 @@ import { randomUUID } from "node:crypto"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { applySubagentBridge, - buildDynamoAgentContext, createDynamoModels, createDynamoProviderConfig, DEFAULT_DYNAMO_MODEL_ID, DYNAMO_PROVIDER_ID, - DynamoSubagentSession, discoverDynamoModels, readDynamoConfig, seedRootTrajectory, + sendTrajectoryFinal, } from "./dynamo-provider.js"; import { registerDynamoToolEventRelay } from "./tool-relay.js"; @@ -33,44 +32,14 @@ export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise const models = discoveredModels.length > 0 ? discoveredModels : createDynamoModels([DEFAULT_DYNAMO_MODEL_ID], config.baseUrl); - // DYN_AGENT_TRACE gates the agentic emissions. A pi-subagents child gets a - // streaming session keyed on its own identity; the lead agent stays unpinned. - const session = - config.traceEnabled && config.sessionControlId - ? new DynamoSubagentSession({ - baseUrl: config.baseUrl, - apiKey: config.apiKey, - sessionControlId: config.sessionControlId, - ...(config.sessionTimeoutSecs !== undefined ? { sessionTimeoutSecs: config.sessionTimeoutSecs } : {}), - }) - : undefined; - - pi.registerProvider(DYNAMO_PROVIDER_ID, createDynamoProviderConfig(config, models, session)); + pi.registerProvider(DYNAMO_PROVIDER_ID, createDynamoProviderConfig(config, models)); if (config.traceEnabled) { await registerDynamoToolEventRelay(pi, config); } - if (session) { - // agent_end fires when this subagent's loop finishes, while the event loop - // is still alive — so we can await the close request and free KV - // deterministically. session_shutdown is the teardown-time backstop; both - // are idempotent. If neither lands (e.g. SIGKILL), Dynamo's idle timeout - // reaps the session. - pi.on("agent_end", async () => { - await session.close(); - }); - pi.on("session_shutdown", async () => { - await session.close(); - }); - } - - // Program close (thunderagent_router): whenever agent_context is being injected - // (trace enabled + a trajectory id), release the program from the router's table - // when the whole session ends. A throwaway max_tokens=1 request carries - // agent_context.trajectory_final; the thunderagent_router short-circuits it - // (deletes the program, never forwards to the engine). Best-effort — Dynamo's - // idle reaper is the backstop if the process dies before it lands. Separate from - // session_control above: that frees SGLang KV; this frees scheduler bookkeeping. + // trajectory_final closes the current trajectory: every subagent on agent_end, + // the root only on true quit. Other session_shutdown reasons keep the same + // trajectory alive across reload/fork/new/resume flows. const programTrajectoryId = config.trajectoryId ?? config.sessionId; if (config.traceEnabled && programTrajectoryId) { const closeModelId = models[0]?.id ?? DEFAULT_DYNAMO_MODEL_ID; @@ -78,40 +47,16 @@ export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise const closeProgram = async (): Promise => { if (programClosed) return; programClosed = true; - const agentContext = { ...buildDynamoAgentContext(config), trajectory_final: true }; - try { - await fetch(`${config.baseUrl}/chat/completions`, { - method: "POST", - headers: { - "content-type": "application/json", - authorization: `Bearer ${config.apiKey}`, - "x-request-id": randomUUID(), - }, - body: JSON.stringify({ - model: closeModelId, - messages: [{ role: "user", content: "." }], - max_tokens: 1, - stream: false, - nvext: { agent_context: agentContext }, - }), - signal: AbortSignal.timeout(5000), - }); - } catch { - // best-effort: the router's idle reaper is the safety net - } + await sendTrajectoryFinal(config, closeModelId, randomUUID); }; - // Agents here are multiturn: the whole interactive session is ONE - // trajectory/program (same trajectory_id across every prompt). Release it - // once at true teardown — NOT on agent_end, which fires per user prompt and - // would close after the first turn, dropping the program's worker/KV affinity - // mid-session (later prompts re-create an unreleased program that only decay - // reaps). Only reason "quit" means the trajectory is done in this process; - // "reload"/"fork"/"new"/"resume" keep the same trajectory_id, so the program - // continues. print-mode (batch) also emits "quit" on dispose and awaits the - // handler, so one-shot runs still close exactly once. - pi.on("session_shutdown", async (event) => { - if (event.reason === "quit") await closeProgram(); - }); + if (config.isSubagent) { + pi.on("agent_end", closeProgram); + pi.on("session_shutdown", closeProgram); + } else { + pi.on("session_shutdown", async (event) => { + if (event.reason === "quit") await closeProgram(); + }); + } } } diff --git a/src/tool-relay.ts b/src/tool-relay.ts index 37ce137..b2b2a40 100644 --- a/src/tool-relay.ts +++ b/src/tool-relay.ts @@ -11,13 +11,9 @@ export const DEFAULT_TOOL_EVENTS_TOPIC = "agent-tool-events"; export const DEFAULT_TOOL_EVENT_QUEUE_CAPACITY = 100000; export interface DynamoToolRelayEnvironment extends DynamoEnvironment { - DYN_AGENT_TOOL_EVENTS_ZMQ_ENDPOINT?: string; - DYN_AGENT_TRACE_TOOL_ZMQ_ENDPOINT?: string; - DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT?: string; - DYN_AGENT_TOOL_EVENTS_ZMQ_TOPIC?: string; - DYN_AGENT_TRACE_TOOL_ZMQ_TOPIC?: string; - DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_TOPIC?: string; - DYN_AGENT_TOOL_EVENTS_QUEUE_CAPACITY?: string; + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT?: string; + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC?: string; + DYN_REQUEST_TRACE_TOOL_EVENTS_QUEUE_CAPACITY?: string; } export interface DynamoToolRelayConfig { @@ -26,7 +22,7 @@ export interface DynamoToolRelayConfig { queueCapacity: number; } -export interface DynamoTraceAgentContext { +export interface DynamoRequestTraceAgentContext { session_type_id: string; session_id: string; trajectory_id: string; @@ -36,7 +32,7 @@ export interface DynamoTraceAgentContext { export type DynamoToolStatus = "running" | "succeeded" | "error" | "cancelled"; export type DynamoToolTraceEventType = "tool_start" | "tool_end" | "tool_error"; -export interface DynamoAgentToolEvent { +export interface DynamoRequestTraceToolEvent { tool_call_id: string; tool_class: string; started_at_unix_ms?: number; @@ -47,13 +43,13 @@ export interface DynamoAgentToolEvent { error_type?: string; } -export interface DynamoAgentTraceRecord { - schema: "dynamo.agent.trace.v1"; +export interface DynamoRequestTraceRecord { + schema: "dynamo.request.trace.v1"; event_type: DynamoToolTraceEventType; event_time_unix_ms: number; event_source: "harness"; - agent_context: DynamoTraceAgentContext; - tool: DynamoAgentToolEvent; + agent_context: DynamoRequestTraceAgentContext; + tool: DynamoRequestTraceToolEvent; } export interface ToolEventSocket { @@ -78,7 +74,7 @@ export interface PiToolExecutionEndEvent { } interface ToolCallStart { - agentContext: DynamoTraceAgentContext; + agentContext: DynamoRequestTraceAgentContext; toolName: string; toolClass: string; startedAtUnixMs: number; @@ -95,17 +91,6 @@ function getEnvValue(env: DynamoToolRelayEnvironment, key: keyof DynamoToolRelay return trimmed ? trimmed : undefined; } -function firstEnvValue( - env: DynamoToolRelayEnvironment, - keys: (keyof DynamoToolRelayEnvironment)[], -): string | undefined { - for (const key of keys) { - const value = getEnvValue(env, key); - if (value) return value; - } - return undefined; -} - function parsePositiveInteger(value: string | undefined, fallback: number): number { if (!value) return fallback; const parsed = Number.parseInt(value, 10); @@ -113,31 +98,22 @@ function parsePositiveInteger(value: string | undefined, fallback: number): numb } export function readDynamoToolRelayConfig(env: DynamoToolRelayEnvironment = process.env): DynamoToolRelayConfig { - const endpoint = firstEnvValue(env, [ - "DYN_AGENT_TOOL_EVENTS_ZMQ_ENDPOINT", - "DYN_AGENT_TRACE_TOOL_ZMQ_ENDPOINT", - "DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT", - ]); + const endpoint = getEnvValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT"); return { ...(endpoint ? { endpoint } : {}), - topic: - firstEnvValue(env, [ - "DYN_AGENT_TOOL_EVENTS_ZMQ_TOPIC", - "DYN_AGENT_TRACE_TOOL_ZMQ_TOPIC", - "DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_TOPIC", - ]) ?? DEFAULT_TOOL_EVENTS_TOPIC, + topic: getEnvValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC") ?? DEFAULT_TOOL_EVENTS_TOPIC, queueCapacity: parsePositiveInteger( - getEnvValue(env, "DYN_AGENT_TOOL_EVENTS_QUEUE_CAPACITY"), + getEnvValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_QUEUE_CAPACITY"), DEFAULT_TOOL_EVENT_QUEUE_CAPACITY, ), }; } -export function buildDynamoTraceAgentContext( +export function buildDynamoRequestTraceAgentContext( config: DynamoProviderRuntimeConfig, sessionId: string | undefined, -): DynamoTraceAgentContext | undefined { +): DynamoRequestTraceAgentContext | undefined { const trajectoryId = config.trajectoryId ?? sessionId; if (!trajectoryId) return undefined; @@ -207,7 +183,7 @@ export class DynamoToolEventPublisher { await this.socket.connect(this.config.endpoint); } - publish(record: DynamoAgentTraceRecord): boolean { + publish(record: DynamoRequestTraceRecord): boolean { if (this.closed || !this.config.endpoint) return false; if (this.queued >= this.config.queueCapacity) return false; @@ -251,7 +227,7 @@ export class DynamoToolEventRelay { ) {} handleToolExecutionStart(event: PiToolExecutionStartEvent, ctx: ExtensionContext): void { - const agentContext = buildDynamoTraceAgentContext(this.config, ctx.sessionManager.getSessionId()); + const agentContext = buildDynamoRequestTraceAgentContext(this.config, ctx.sessionManager.getSessionId()); if (!agentContext) return; const startedAtUnixMs = this.nowUnixMs(); @@ -265,7 +241,7 @@ export class DynamoToolEventRelay { }); this.publisher.publish({ - schema: "dynamo.agent.trace.v1", + schema: "dynamo.request.trace.v1", event_type: "tool_start", event_time_unix_ms: startedAtUnixMs, event_source: "harness", @@ -285,7 +261,8 @@ export class DynamoToolEventRelay { const start = this.starts.get(event.toolCallId); this.starts.delete(event.toolCallId); - const agentContext = start?.agentContext ?? buildDynamoTraceAgentContext(this.config, ctx.sessionManager.getSessionId()); + const agentContext = + start?.agentContext ?? buildDynamoRequestTraceAgentContext(this.config, ctx.sessionManager.getSessionId()); if (!agentContext) return; const startedAtUnixMs = start?.startedAtUnixMs ?? endedAtUnixMs; @@ -296,7 +273,7 @@ export class DynamoToolEventRelay { const outputBytes = getToolResultOutputBytes(event.result); this.publisher.publish({ - schema: "dynamo.agent.trace.v1", + schema: "dynamo.request.trace.v1", event_type: event.isError ? "tool_error" : "tool_end", event_time_unix_ms: endedAtUnixMs, event_source: "harness", diff --git a/test/dynamo-provider.test.ts b/test/dynamo-provider.test.ts index 170c298..71ec32d 100644 --- a/test/dynamo-provider.test.ts +++ b/test/dynamo-provider.test.ts @@ -7,18 +7,18 @@ import { applySubagentBridge, buildDynamoAgentContext, buildDynamoHeaders, + computeSubagentTrajectoryId, computeSubagentTrajectoryRewrite, createDynamoStreamSimple, DEFAULT_DYNAMO_BASE_URL, DEFAULT_DYNAMO_MODEL_ID, DEFAULT_SESSION_TYPE_ID, DYNAMO_API, - DynamoSubagentSession, mergeDynamoAgentContext, - mergeDynamoSessionControl, normalizeDynamoBaseUrl, readDynamoConfig, seedRootTrajectory, + sendTrajectoryFinal, } from "../src/dynamo-provider.js"; // Spread `base` with the given keys dropped (env-absent). Avoids the @@ -36,6 +36,7 @@ const config = { apiKey: "test-key", traceEnabled: true, sessionTypeId: DEFAULT_SESSION_TYPE_ID, + isSubagent: false, }; const model = { @@ -67,7 +68,7 @@ describe("dynamo provider config", () => { OPENAI_BASE_URL: "http://ignored.test/v1", DYNAMO_BASE_URL: "http://dynamo.test", DYNAMO_API_KEY: "dyn-key", - DYN_AGENT_TRACE: "1", + DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_TYPE_ID: "session-kind", DYN_AGENT_SESSION_ID: "session-id", DYN_AGENT_TRAJECTORY_ID: "trajectory-id", @@ -81,16 +82,17 @@ describe("dynamo provider config", () => { sessionId: "session-id", trajectoryId: "trajectory-id", parentTrajectoryId: "parent-id", + isSubagent: false, }); }); - it("treats DYN_AGENT_TRACE as a truthy master switch, default off", () => { + it("treats DYN_REQUEST_TRACE as a truthy master switch, default off", () => { expect(readDynamoConfig({}).traceEnabled).toBe(false); for (const v of ["1", "true", "TRUE", "yes", "on"]) { - expect(readDynamoConfig({ DYN_AGENT_TRACE: v }).traceEnabled).toBe(true); + expect(readDynamoConfig({ DYN_REQUEST_TRACE: v }).traceEnabled).toBe(true); } for (const v of ["0", "false", "no", ""]) { - expect(readDynamoConfig({ DYN_AGENT_TRACE: v }).traceEnabled).toBe(false); + expect(readDynamoConfig({ DYN_REQUEST_TRACE: v }).traceEnabled).toBe(false); } }); }); @@ -123,14 +125,19 @@ describe("pi-subagents trajectory bridge", () => { expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "PI_SUBAGENT_CHILD"))).toBeNull(); }); - it("does NOT override an explicit DYN_AGENT_PARENT_TRAJECTORY_ID (manual wins)", () => { + it("uses an explicit DYN_AGENT_PARENT_TRAJECTORY_ID when present (manual wins)", () => { expect( computeSubagentTrajectoryRewrite({ ...childEnv, DYN_AGENT_PARENT_TRAJECTORY_ID: "manual-parent" }), - ).toBeNull(); + ).toEqual({ + parentTrajectoryId: "manual-parent", + trajectoryId: "run-1:researcher:2", + }); }); - it("skips when inherited DYN_AGENT_TRAJECTORY_ID is absent", () => { - expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "DYN_AGENT_TRAJECTORY_ID"))).toBeNull(); + it("still creates a child trajectory when inherited DYN_AGENT_TRAJECTORY_ID is absent", () => { + expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "DYN_AGENT_TRAJECTORY_ID"))).toEqual({ + trajectoryId: "run-1:researcher:2", + }); }); it("skips when PI_SUBAGENT_RUN_ID or PI_SUBAGENT_CHILD_AGENT is missing", () => { @@ -142,6 +149,7 @@ describe("pi-subagents trajectory bridge", () => { const cfg = readDynamoConfig(childEnv); expect(cfg.trajectoryId).toBe("run-1:researcher:2"); expect(cfg.parentTrajectoryId).toBe("parent-traj"); + expect(cfg.isSubagent).toBe(true); }); it("applySubagentBridge mutates process.env so nested spawns chain correctly", () => { @@ -172,7 +180,7 @@ describe("pi-subagents trajectory bridge", () => { describe("root trajectory seed", () => { it("seeds DYN_AGENT_TRAJECTORY_ID at the root so subagents inherit a parent", () => { - const env: NodeJS.ProcessEnv = { DYN_AGENT_TRACE: "1" }; + const env: NodeJS.ProcessEnv = { DYN_REQUEST_TRACE: "1" }; expect(seedRootTrajectory(env, () => "root-traj")).toBe(true); expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("root-traj"); // The bug fix: a subagent spawned from this env now resolves a parent. @@ -189,15 +197,15 @@ describe("root trajectory seed", () => { }); it("uses DYN_AGENT_SESSION_ID as the root trajectory when present", () => { - const env: NodeJS.ProcessEnv = { DYN_AGENT_TRACE: "1", DYN_AGENT_SESSION_ID: "sess-7" }; + const env: NodeJS.ProcessEnv = { DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "sess-7" }; expect(seedRootTrajectory(env, () => "unused")).toBe(true); expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("sess-7"); }); it("no-ops when trace is off, in a subagent child, or trajectory already set", () => { expect(seedRootTrajectory({}, () => "x")).toBe(false); - expect(seedRootTrajectory({ DYN_AGENT_TRACE: "1", PI_SUBAGENT_CHILD: "1" }, () => "x")).toBe(false); - const preset: NodeJS.ProcessEnv = { DYN_AGENT_TRACE: "1", DYN_AGENT_TRAJECTORY_ID: "caller" }; + expect(seedRootTrajectory({ DYN_REQUEST_TRACE: "1", PI_SUBAGENT_CHILD: "1" }, () => "x")).toBe(false); + const preset: NodeJS.ProcessEnv = { DYN_REQUEST_TRACE: "1", DYN_AGENT_TRAJECTORY_ID: "caller" }; expect(seedRootTrajectory(preset, () => "x")).toBe(false); expect(preset.DYN_AGENT_TRAJECTORY_ID).toBe("caller"); }); @@ -318,7 +326,7 @@ describe("streamSimple wrapper", () => { }); }); - it("injects nothing when DYN_AGENT_TRACE is off (plain provider), but still sets x-request-id", async () => { + it("injects nothing when DYN_REQUEST_TRACE is off (plain provider), but still sets x-request-id", async () => { let capturedOptions: SimpleStreamOptions | undefined; const streamSimple = createDynamoStreamSimple( { ...config, traceEnabled: false }, @@ -337,7 +345,7 @@ describe("streamSimple wrapper", () => { }); }); -describe("subagent session control", () => { +describe("subagent trajectory context", () => { const subagentEnv = { DYNAMO_BASE_URL: "http://dynamo.test", DYN_AGENT_TRAJECTORY_ID: "orchestrator", @@ -347,74 +355,28 @@ describe("subagent session control", () => { PI_SUBAGENT_CHILD_INDEX: "3", } as const; - it("sets sessionControlId only for a pi-subagents child", () => { - expect(readDynamoConfig(subagentEnv).sessionControlId).toBe("run-1:scout:3"); - // Lead agent (no subagent bookkeeping) stays unpinned. + it("sets child trajectory only for a pi-subagents child", () => { + expect(computeSubagentTrajectoryId(subagentEnv)).toBe("run-1:scout:3"); const { PI_SUBAGENT_CHILD: _omit, ...leadEnv } = subagentEnv; - expect(readDynamoConfig(leadEnv).sessionControlId).toBeUndefined(); + expect(computeSubagentTrajectoryId(leadEnv)).toBeUndefined(); }); - it("derives sessionControlId from PI_SUBAGENT_* alone — no DYN_AGENT_TRAJECTORY_ID needed", () => { - // Decoupled from the trajectory bridge: a subagent gets KV isolation even - // when no trajectory lineage was set up by the operator. + it("derives child trajectory from PI_SUBAGENT_* alone", () => { const noTrajectory = envWithout(subagentEnv, "DYN_AGENT_TRAJECTORY_ID"); const cfg = readDynamoConfig(noTrajectory); - expect(cfg.sessionControlId).toBe("run-1:scout:3"); - expect(cfg.trajectoryId).toBeUndefined(); + expect(cfg.trajectoryId).toBe("run-1:scout:3"); expect(cfg.parentTrajectoryId).toBeUndefined(); + expect(cfg.isSubagent).toBe(true); }); it("requires a complete subagent identity (run id + agent name)", () => { - expect(readDynamoConfig(envWithout(subagentEnv, "PI_SUBAGENT_RUN_ID")).sessionControlId).toBeUndefined(); - expect(readDynamoConfig(envWithout(subagentEnv, "PI_SUBAGENT_CHILD_AGENT")).sessionControlId).toBeUndefined(); + expect(computeSubagentTrajectoryId(envWithout(subagentEnv, "PI_SUBAGENT_RUN_ID"))).toBeUndefined(); + expect(computeSubagentTrajectoryId(envWithout(subagentEnv, "PI_SUBAGENT_CHILD_AGENT"))).toBeUndefined(); // Index defaults to 0 when absent. - expect(readDynamoConfig(envWithout(subagentEnv, "PI_SUBAGENT_CHILD_INDEX")).sessionControlId).toBe("run-1:scout:0"); - }); - - it("parses DYN_AGENT_SESSION_TIMEOUT, ignoring non-positive values", () => { - expect(readDynamoConfig({ ...subagentEnv, DYN_AGENT_SESSION_TIMEOUT: "60" }).sessionTimeoutSecs).toBe(60); - expect(readDynamoConfig({ ...subagentEnv, DYN_AGENT_SESSION_TIMEOUT: "0" }).sessionTimeoutSecs).toBeUndefined(); - expect(readDynamoConfig({ ...subagentEnv, DYN_AGENT_SESSION_TIMEOUT: "junk" }).sessionTimeoutSecs).toBeUndefined(); - expect(readDynamoConfig(subagentEnv).sessionTimeoutSecs).toBeUndefined(); - }); - - it("opens on the first turn then goes sticky, carrying the timeout when set", () => { - const session = new DynamoSubagentSession({ - baseUrl: "http://dynamo.test/v1", - apiKey: "k", - sessionControlId: "run-1:scout:3", - sessionTimeoutSecs: 60, - }); - expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", timeout: 60 }); - expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", timeout: 60 }); - expect(session.controlForTurn()).toEqual({ session_id: "run-1:scout:3", timeout: 60 }); + expect(computeSubagentTrajectoryId(envWithout(subagentEnv, "PI_SUBAGENT_CHILD_INDEX"))).toBe("run-1:scout:0"); }); - it("omits the timeout field when no override is configured", () => { - const session = new DynamoSubagentSession({ - baseUrl: "http://dynamo.test/v1", - apiKey: "k", - sessionControlId: "sess-1", - }); - expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); - }); - - it("merges nvext.session_control without dropping existing nvext fields", () => { - const payload = mergeDynamoSessionControl( - { model: "demo", nvext: { extra_fields: ["worker_id"], agent_context: { phase: "reasoning" } } }, - { session_id: "sess-1", timeout: 60 }, - ); - expect(payload).toEqual({ - model: "demo", - nvext: { - extra_fields: ["worker_id"], - agent_context: { phase: "reasoning" }, - session_control: { session_id: "sess-1", timeout: 60 }, - }, - }); - }); - - it("close fires a throwaway action:close request, is idempotent, and skips before any turn", async () => { + it("trajectory_final sends agent_context only", async () => { const calls: Array<{ url: string; body: unknown; headers: unknown }> = []; const fakeFetch = async (url: string, init: RequestInit) => { calls.push({ @@ -425,20 +387,8 @@ describe("subagent session control", () => { return { ok: true, status: 200 }; }; - const session = new DynamoSubagentSession( - { baseUrl: "http://dynamo.test/v1", apiKey: "k", sessionControlId: "sess-1" }, - () => "close-req-1", - ); - - // No turn has tagged the session yet: close is a no-op. - expect(await session.close(fakeFetch)).toBe(false); - expect(calls).toHaveLength(0); - - session.controlForTurn(); // first tagged turn - session.modelId = "zai-org/GLM-4.7-Flash"; - - expect(await session.close(fakeFetch)).toBe(true); - expect(await session.close(fakeFetch)).toBe(false); // idempotent + const cfg = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "run-1" }); + expect(await sendTrajectoryFinal(cfg, "zai-org/GLM-4.7-Flash", () => "close-req-1", fakeFetch)).toBe(true); expect(calls).toHaveLength(1); expect(calls[0]?.url).toBe("http://dynamo.test/v1/chat/completions"); expect(calls[0]?.body).toEqual({ @@ -446,57 +396,46 @@ describe("subagent session control", () => { messages: [{ role: "user", content: "." }], max_tokens: 1, stream: false, - nvext: { session_control: { session_id: "sess-1", action: "close" } }, + nvext: { + agent_context: { + trajectory_id: "run-1:scout:3", + parent_trajectory_id: "orchestrator", + session_id: "run-1", + session_type_id: DEFAULT_SESSION_TYPE_ID, + phase: "reasoning", + trajectory_final: true, + }, + }, }); expect((calls[0]?.headers as Record)["x-request-id"]).toBe("close-req-1"); }); - it("re-arms on the next turn after a close (multi-prompt subagent)", async () => { - const fakeFetch = async () => ({ ok: true, status: 200 }); - const session = new DynamoSubagentSession({ - baseUrl: "http://dynamo.test/v1", - apiKey: "k", - sessionControlId: "sess-1", - }); - - expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); - expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); - expect(await session.close(fakeFetch)).toBe(true); - // A later prompt's first turn re-tags the session; close fires again. - expect(session.controlForTurn()).toEqual({ session_id: "sess-1" }); - expect(await session.close(fakeFetch)).toBe(true); - }); - - it("streamSimple injects session_control alongside agent_context", async () => { + it("streamSimple injects subagent agent_context without session_control", async () => { let capturedOptions: SimpleStreamOptions | undefined; - const session = new DynamoSubagentSession({ - baseUrl: "http://dynamo.test/v1", - apiKey: "k", - sessionControlId: "run-1:scout:3", - sessionTimeoutSecs: 60, - }); + const subagentConfig = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "run-1" }); const streamSimple = createDynamoStreamSimple( - config, + subagentConfig, (_model, _context, options) => { capturedOptions = options; return createAssistantMessageEventStream(); }, () => "request-1", - session, ); streamSimple(model, context, { sessionId: "pi-session" }); const onPayload = capturedOptions?.onPayload; if (!onPayload) throw new Error("expected wrapped onPayload"); const injected = (await onPayload({ model: DEFAULT_DYNAMO_MODEL_ID }, model)) as { - nvext: { agent_context: unknown; session_control: unknown }; + nvext: { agent_context: unknown; session_control?: unknown }; }; - expect(injected.nvext.agent_context).toMatchObject({ phase: "reasoning" }); - expect(injected.nvext.session_control).toEqual({ - session_id: "run-1:scout:3", - timeout: 60, + expect(injected.nvext.agent_context).toEqual({ + trajectory_id: "run-1:scout:3", + parent_trajectory_id: "orchestrator", + session_id: "run-1", + session_type_id: DEFAULT_SESSION_TYPE_ID, + phase: "reasoning", }); - expect(session.modelId).toBe(DEFAULT_DYNAMO_MODEL_ID); + expect(injected.nvext.session_control).toBeUndefined(); }); }); diff --git a/test/integration/smoke.mjs b/test/integration/smoke.mjs index bd87ff8..15130d1 100644 --- a/test/integration/smoke.mjs +++ b/test/integration/smoke.mjs @@ -3,7 +3,7 @@ // Integration smoke test: spins up a Dynamo frontend + mocker, sends one chat // completion through pi-dynamo-provider's streamSimple wrapper, and asserts -// that nvext.agent_context fields round-trip into the JSONL agent trace. +// that nvext.agent_context fields round-trip into the JSONL request trace. // // Not a unit test — runs out-of-band of vitest. Driven by // scripts/integration-smoke.sh which boots Dynamo, exports the trace sink env @@ -28,7 +28,7 @@ import { readDynamoConfig, } from "../../dist/dynamo-provider.js"; -const TRACE_PATH = mustEnv("DYN_AGENT_TRACE_OUTPUT_PATH"); +const TRACE_PATH = mustEnv("DYN_REQUEST_TRACE_OUTPUT_PATH"); const BASE_URL = mustEnv("DYNAMO_BASE_URL"); const MODEL_ID = mustEnv("DYNAMO_TEST_MODEL_ID"); diff --git a/test/program-close.test.ts b/test/program-close.test.ts index 2532f46..7d12dfe 100644 --- a/test/program-close.test.ts +++ b/test/program-close.test.ts @@ -43,7 +43,7 @@ describe("program close (trajectory_final) — multiturn", () => { beforeEach(() => { process.env = { ...savedEnv, - DYN_AGENT_TRACE: "1", + DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "t-1", DYN_AGENT_TRAJECTORY_ID: "t-1", DYNAMO_BASE_URL: "http://frontend:8000/v1", @@ -87,3 +87,39 @@ describe("program close (trajectory_final) — multiturn", () => { expect(closeBodies[0].max_tokens).toBe(1); }); }); + +describe("subagent trajectory close", () => { + const savedEnv = process.env; + beforeEach(() => { + process.env = { + ...savedEnv, + DYN_REQUEST_TRACE: "1", + DYN_AGENT_SESSION_ID: "root-session", + DYN_AGENT_TRAJECTORY_ID: "root-trajectory", + DYNAMO_BASE_URL: "http://frontend:8000/v1", + PI_SUBAGENT_CHILD: "1", + PI_SUBAGENT_RUN_ID: "root-session", + PI_SUBAGENT_CHILD_AGENT: "researcher", + PI_SUBAGENT_CHILD_INDEX: "0", + }; + }); + afterEach(() => { + process.env = savedEnv; + vi.unstubAllGlobals(); + vi.restoreAllMocks(); + }); + + it("closes the child trajectory on agent_end", async () => { + const closeBodies = installFetch(); + const { pi, handlers } = makePi(); + await dynamoProviderExtension(pi as any); + await handlers.agent_end!({ type: "agent_end", messages: [] }); + await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); + + expect(closeBodies).toHaveLength(1); + const ctx = closeBodies[0].nvext.agent_context; + expect(ctx.trajectory_final).toBe(true); + expect(ctx.trajectory_id).toBe("root-session:researcher:0"); + expect(ctx.parent_trajectory_id).toBe("root-trajectory"); + }); +}); diff --git a/test/tool-relay.test.ts b/test/tool-relay.test.ts index 1b89956..9f4d3e8 100644 --- a/test/tool-relay.test.ts +++ b/test/tool-relay.test.ts @@ -6,13 +6,13 @@ import type { ExtensionContext } from "@mariozechner/pi-coding-agent"; import { describe, expect, it } from "vitest"; import { DEFAULT_DYNAMO_BASE_URL, DEFAULT_SESSION_TYPE_ID } from "../src/dynamo-provider.js"; import { - buildDynamoTraceAgentContext, + buildDynamoRequestTraceAgentContext, DEFAULT_TOOL_EVENT_QUEUE_CAPACITY, DynamoToolEventPublisher, DynamoToolEventRelay, getToolClass, readDynamoToolRelayConfig, - type DynamoAgentTraceRecord, + type DynamoRequestTraceRecord, type ToolEventSocket, } from "../src/tool-relay.js"; @@ -49,16 +49,16 @@ function createContext(sessionId: string): ExtensionContext { } as unknown as ExtensionContext; } -function decodeTraceRecord(frame: Buffer): DynamoAgentTraceRecord { - return decode(frame) as DynamoAgentTraceRecord; +function decodeTraceRecord(frame: Buffer): DynamoRequestTraceRecord { + return decode(frame) as DynamoRequestTraceRecord; } describe("tool relay config", () => { it("reads Dynamo tool relay env aliases", () => { expect( readDynamoToolRelayConfig({ - DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT: "tcp://127.0.0.1:20390", - DYN_AGENT_TRACE_TOOL_EVENTS_ZMQ_TOPIC: "tools", + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT: "tcp://127.0.0.1:20390", + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC: "tools", }), ).toEqual({ endpoint: "tcp://127.0.0.1:20390", @@ -68,9 +68,9 @@ describe("tool relay config", () => { expect( readDynamoToolRelayConfig({ - DYN_AGENT_TOOL_EVENTS_ZMQ_ENDPOINT: "ipc:///tmp/pi-tools", - DYN_AGENT_TOOL_EVENTS_ZMQ_TOPIC: "pi-tools", - DYN_AGENT_TOOL_EVENTS_QUEUE_CAPACITY: "7", + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT: "ipc:///tmp/pi-tools", + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC: "pi-tools", + DYN_REQUEST_TRACE_TOOL_EVENTS_QUEUE_CAPACITY: "7", }), ).toEqual({ endpoint: "ipc:///tmp/pi-tools", @@ -82,7 +82,7 @@ describe("tool relay config", () => { describe("tool relay agent context", () => { it("uses the Pi session ID as default trajectory and session ID", () => { - expect(buildDynamoTraceAgentContext(config, "pi-session")).toEqual({ + expect(buildDynamoRequestTraceAgentContext(config, "pi-session")).toEqual({ session_type_id: DEFAULT_SESSION_TYPE_ID, session_id: "pi-session", trajectory_id: "pi-session", @@ -91,7 +91,7 @@ describe("tool relay agent context", () => { it("uses env session/trajectory IDs when provided", () => { expect( - buildDynamoTraceAgentContext( + buildDynamoRequestTraceAgentContext( { ...config, sessionId: "session-1", @@ -138,7 +138,7 @@ describe("tool relay records", () => { expect(socket.sent[0]?.[0].toString("utf8")).toBe("tools"); expect(socket.sent[0]?.[1].readBigUInt64BE()).toBe(0n); expect(decodeTraceRecord(socket.sent[0]?.[2] ?? Buffer.alloc(0))).toEqual({ - schema: "dynamo.agent.trace.v1", + schema: "dynamo.request.trace.v1", event_type: "tool_start", event_time_unix_ms: 1000, event_source: "harness", @@ -171,7 +171,7 @@ describe("tool relay records", () => { expect(socket.sent).toHaveLength(2); expect(socket.sent[1]?.[1].readBigUInt64BE()).toBe(1n); expect(decodeTraceRecord(socket.sent[1]?.[2] ?? Buffer.alloc(0))).toEqual({ - schema: "dynamo.agent.trace.v1", + schema: "dynamo.request.trace.v1", event_type: "tool_end", event_time_unix_ms: 1500, event_source: "harness", @@ -213,7 +213,7 @@ describe("tool relay records", () => { await publisher.flush(); expect(decodeTraceRecord(socket.sent[0]?.[2] ?? Buffer.alloc(0))).toEqual({ - schema: "dynamo.agent.trace.v1", + schema: "dynamo.request.trace.v1", event_type: "tool_error", event_time_unix_ms: 2000, event_source: "harness", From dc0f5db82d116ebc3424458efdb665981ec5425f Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 15 Jun 2026 13:04:45 +0000 Subject: [PATCH 3/6] fix: include trace switch in launcher env Signed-off-by: Ishan Dhanani --- scripts/launch-agg-agent.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/launch-agg-agent.sh b/scripts/launch-agg-agent.sh index a48160c..a34725f 100755 --- a/scripts/launch-agg-agent.sh +++ b/scripts/launch-agg-agent.sh @@ -162,6 +162,7 @@ Pi environment for another shell: export DYNAMO_BASE_URL=http://127.0.0.1:${HTTP_PORT}/v1 export DYNAMO_API_KEY=dummy + export DYN_REQUEST_TRACE=1 export DYN_AGENT_SESSION_TYPE_ID=pi_coding_agent export DYN_AGENT_SESSION_ID=pi-demo-${RUN_ID} export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 From dc07815d497eb361ea357e26670eb8ac82ffa78d Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 15 Jun 2026 13:25:11 +0000 Subject: [PATCH 4/6] docs: add headless pi dynamo skill Signed-off-by: Ishan Dhanani --- CLAUDE.md | 2 + skills/pi-headless-dynamo/SKILL.md | 161 +++++++++++++++++++ skills/pi-headless-dynamo/agents/openai.yaml | 4 + 3 files changed, 167 insertions(+) create mode 100644 skills/pi-headless-dynamo/SKILL.md create mode 100644 skills/pi-headless-dynamo/agents/openai.yaml diff --git a/CLAUDE.md b/CLAUDE.md index f84d45a..45c4c38 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -24,6 +24,8 @@ Tests live in `test/` as siblings of `src/`. Use vitest's `describe`/`it`/`expec `test/integration/smoke.mjs` is the out-of-band end-to-end check — driven by `scripts/integration-smoke.sh`, not vitest. It boots Dynamo's frontend + mocker, sends one real chat completion, and asserts `nvext.agent_context` round-trips into the request trace JSONL. Two cases: top-level agent_context and the pi-subagents bridge. Mocker output is garbage; assertions only target the trace envelope. CI clones `ai-dynamo/dynamo@main` and builds from source. Cargo cache keeps warm runs ~60-90s, cold ~10 min. `workflow_dispatch` accepts a `dynamo_ref` input for ad-hoc validation against a specific branch, tag, or SHA. +For real Pi CLI lifecycle validation against a Dynamo endpoint, read `skills/pi-headless-dynamo/SKILL.md` first and drive the actual interactive Pi TUI instead of faking provider requests or pi-subagents env. + ## Coding standards - TypeScript strict mode. Don't add `any`; prefer `unknown` + narrow. diff --git a/skills/pi-headless-dynamo/SKILL.md b/skills/pi-headless-dynamo/SKILL.md new file mode 100644 index 0000000..0faa3e3 --- /dev/null +++ b/skills/pi-headless-dynamo/SKILL.md @@ -0,0 +1,161 @@ +--- +name: pi-headless-dynamo +description: Drive the real Pi CLI headlessly against a Dynamo or OpenAI-compatible endpoint for pi-dynamo-provider validation. Use when testing Pi provider installs, agent_context tracing, trajectory-native lifecycle release, Pi subagent runs, saved traces, or parent/child session behavior without manually faking Pi or pi-subagents internals. +--- + +# Pi Headless Dynamo + +## Purpose + +Drive Pi the way a human would: launch the real interactive Pi CLI in a +pseudoterminal, type normal prompts or public slash commands, let Pi and +pi-subagents create child sessions, then verify behavior from Pi artifacts and +Dynamo/SGLang logs. + +Do not synthesize `PI_SUBAGENT_*`, edit Pi session files, call the provider +directly to stand in for Pi, or patch pi-subagents while validating this repo. + +## Preconditions + +Use a running Dynamo endpoint or start one with the repo launcher: + +```bash +scripts/launch-agg-agent.sh --dynamo-dir /ephemeral/dynamo-radix-native --gpu 0,1 --tp 2 --http-port 18083 --system-port 18084 +``` + +Before launching Pi, verify the endpoint and model: + +```bash +curl -sf http://127.0.0.1:18083/v1/models +``` + +For trajectory-native release evidence, the endpoint must use Dynamo +`--router-mode kv` and an SGLang worker with `--enable-session-radix-cache`. +The local launcher prints the exact Pi environment and trace path; prefer that +block over hand-rolled env. + +## Launch Pi + +Use a fresh artifact root and run the real TUI under `script(1)` so the +terminal transcript is saved: + +```bash +RUN_ROOT=/ephemeral/pi-headless-$(date -u +%Y%m%dT%H%M%SZ) +WORKSPACE=$RUN_ROOT/workspace +MODEL=zai-org/GLM-4.7-Flash +mkdir -p "$WORKSPACE" "$RUN_ROOT/pi-sessions" + +export DYNAMO_BASE_URL=http://127.0.0.1:18083/v1 +export DYNAMO_API_KEY=dummy +export DYN_REQUEST_TRACE=1 +export DYN_AGENT_SESSION_TYPE_ID=pi_coding_agent +export DYN_AGENT_SESSION_ID=pi-headless-$(date -u +%Y%m%dT%H%M%SZ) +export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 + +cd "$WORKSPACE" +script -qefc "pi --model dynamo/${MODEL} --tools subagent,bash,write,read,ls,grep,find --session-dir ${RUN_ROOT}/pi-sessions" "${RUN_ROOT}/pi-terminal.typescript" +``` + +Control that process through its PTY like a user: + +- wait for the Pi prompt before sending the first prompt; +- paste a full prompt or slash command as text; +- send Enter to submit; +- wait for Pi to finish before sending the next prompt; +- exit with Ctrl-D so root `trajectory_final` is emitted. + +Do not kill Pi to end a lifecycle run unless it is hung and the failure is the +thing being tested. + +## Drive A Lifecycle Run + +Start with a normal parent turn: + +```text +Create a short project brief in this workspace, then tell me when you are ready for delegated follow-up work. +``` + +Launch children through pi-subagents' public command surface. For Dynamo tests, +pin the child model in every step; otherwise builtins may inherit a non-Dynamo +default model and fail before reaching the endpoint. + +```text +/parallel delegate[model=dynamo/zai-org/GLM-4.7-Flash,output=child-a.md,outputMode=file-only] "Work only in the current workspace. Create logs/a.md with a concise result. Read it back. End with CHILD_A_DONE." -> delegate[model=dynamo/zai-org/GLM-4.7-Flash,output=child-b.md,outputMode=file-only] "Work only in the current workspace. Create logs/b.md with a concise result. Read it back. End with CHILD_B_DONE." -> delegate[model=dynamo/zai-org/GLM-4.7-Flash,output=child-c.md,outputMode=file-only] "Work only in the current workspace. Create logs/c.md with a concise result. Read it back. End with CHILD_C_DONE." +``` + +Prefer `delegate` for lifecycle plumbing tests. Agents such as `worker`, +`planner`, and `oracle` may default to forked context; forked context is valid +but has persisted-session preconditions. If a lifecycle test does not need +forked context, avoid making that a second variable. + +After Pi reports the children complete, keep talking to the parent without +subagents: + +```text +Do not call subagent. Inspect the child artifacts in this workspace and summarize them. End with PARENT_AFTER_CHILDREN_OK. +``` + +Then send one final parent-only turn: + +```text +One final parent-only turn. Do not call subagent. Rank the top two artifacts for follow-up and give one reason each. End with PARENT_FINAL_OK. +``` + +Exit the Pi session with Ctrl-D. + +## Verify Evidence + +Collect the artifact paths in the final report: + +- Pi transcript: `${RUN_ROOT}/pi-terminal.typescript` +- Pi sessions: `${RUN_ROOT}/pi-sessions` +- Dynamo trace: the `dynamo-request-trace.jsonl` path printed by the launcher +- frontend and worker logs from the launcher run directory + +Useful checks: + +```bash +TRACE_PATH=/path/from/launcher/dynamo-request-trace.jsonl +FRONTEND_LOG=/path/from/launcher/logs/frontend.log +WORKER_LOG=/path/from/launcher/logs/worker.log + +rg -n "CHILD_.*_DONE|PARENT_AFTER_CHILDREN_OK|PARENT_FINAL_OK" "$RUN_ROOT/pi-terminal.typescript" "$RUN_ROOT/pi-sessions" + +jq -s '{ + events: length, + output_tokens_total: (map(.event.request.output_tokens // 0) | add), + input_lengths: { + min: (map(.event.request.replay.input_length // 0) | min), + max: (map(.event.request.replay.input_length // 0) | max) + }, + first_ms: .[0].event.event_time_unix_ms, + last_ms: .[-1].event.event_time_unix_ms +}' "$TRACE_PATH" + +rg -n "Removing session affinity|close_session response|release_session" "$FRONTEND_LOG" "$WORKER_LOG" + +ps -u "$USER" -o pid,args | rg 'launch-agg-agent|dynamo\.frontend|dynamo\.sglang|sglang::|aiperf|pi --model' | rg -v 'rg|bash -lc ps' || true +nvidia-smi --query-gpu=index,name,memory.used,memory.total --format=csv,noheader +``` + +The lifecycle ordering to prove: + +1. Child trajectories close and release first. +2. Parent-only turns still run after child release. +3. Root trajectory closes only after Pi exits normally. +4. The server is stopped and GPUs return to baseline. + +## Troubleshooting + +- `401 "Invalid username or password."` in child sessions means a child did not + use the Dynamo model. Add `model=dynamo/` to every subagent + step or configure `subagents.agentOverrides`. +- `Failed to create forked subagent session` means a forked-context child could + not branch from the parent session. Use `delegate` for fresh-context lifecycle + tests, avoid `--fork`, or remove custom session-dir variables while debugging + fork behavior. +- No `agent_context` or no trace rows means `DYN_REQUEST_TRACE=1` was missing + from the Pi process environment. +- No `release_session` after finality usually means Dynamo was not in + `--router-mode kv`, the worker lacked `--enable-session-radix-cache`, or Pi + was killed instead of exited with Ctrl-D. diff --git a/skills/pi-headless-dynamo/agents/openai.yaml b/skills/pi-headless-dynamo/agents/openai.yaml new file mode 100644 index 0000000..66461a1 --- /dev/null +++ b/skills/pi-headless-dynamo/agents/openai.yaml @@ -0,0 +1,4 @@ +interface: + display_name: "Pi Headless Dynamo" + short_description: "Drive Pi headlessly against Dynamo like an interactive user." + default_prompt: "Drive a Pi CLI session headlessly against a Dynamo endpoint, including subagents and lifecycle trace validation." From 9540d6ce8fb70297edb24061dea78ba27f2dcc53 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 15 Jun 2026 13:29:20 +0000 Subject: [PATCH 5/6] docs: simplify pi headless env and cover forked context Signed-off-by: Ishan Dhanani --- scripts/launch-agg-agent.sh | 2 -- skills/pi-headless-dynamo/SKILL.md | 49 ++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/scripts/launch-agg-agent.sh b/scripts/launch-agg-agent.sh index a34725f..f32b419 100755 --- a/scripts/launch-agg-agent.sh +++ b/scripts/launch-agg-agent.sh @@ -163,8 +163,6 @@ Pi environment for another shell: export DYNAMO_BASE_URL=http://127.0.0.1:${HTTP_PORT}/v1 export DYNAMO_API_KEY=dummy export DYN_REQUEST_TRACE=1 - export DYN_AGENT_SESSION_TYPE_ID=pi_coding_agent - export DYN_AGENT_SESSION_ID=pi-demo-${RUN_ID} export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 Example Pi command: diff --git a/skills/pi-headless-dynamo/SKILL.md b/skills/pi-headless-dynamo/SKILL.md index 0faa3e3..5ebb377 100644 --- a/skills/pi-headless-dynamo/SKILL.md +++ b/skills/pi-headless-dynamo/SKILL.md @@ -43,17 +43,15 @@ terminal transcript is saved: RUN_ROOT=/ephemeral/pi-headless-$(date -u +%Y%m%dT%H%M%SZ) WORKSPACE=$RUN_ROOT/workspace MODEL=zai-org/GLM-4.7-Flash -mkdir -p "$WORKSPACE" "$RUN_ROOT/pi-sessions" +mkdir -p "$WORKSPACE" export DYNAMO_BASE_URL=http://127.0.0.1:18083/v1 export DYNAMO_API_KEY=dummy export DYN_REQUEST_TRACE=1 -export DYN_AGENT_SESSION_TYPE_ID=pi_coding_agent -export DYN_AGENT_SESSION_ID=pi-headless-$(date -u +%Y%m%dT%H%M%SZ) export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 cd "$WORKSPACE" -script -qefc "pi --model dynamo/${MODEL} --tools subagent,bash,write,read,ls,grep,find --session-dir ${RUN_ROOT}/pi-sessions" "${RUN_ROOT}/pi-terminal.typescript" +script -qefc "pi --model dynamo/${MODEL} --tools subagent,bash,write,read,ls,grep,find" "${RUN_ROOT}/pi-terminal.typescript" ``` Control that process through its PTY like a user: @@ -67,6 +65,11 @@ Control that process through its PTY like a user: Do not kill Pi to end a lifecycle run unless it is hung and the failure is the thing being tested. +`DYN_AGENT_SESSION_TYPE_ID` and `DYN_AGENT_SESSION_ID` are optional labels. The +provider defaults `session_type_id` to `pi_coding_agent`; normal LLM requests +use Pi's own session id when `DYN_AGENT_SESSION_ID` is absent. Set them only +when a run needs stable, human-chosen trace labels. + ## Drive A Lifecycle Run Start with a normal parent turn: @@ -83,10 +86,32 @@ default model and fail before reaching the endpoint. /parallel delegate[model=dynamo/zai-org/GLM-4.7-Flash,output=child-a.md,outputMode=file-only] "Work only in the current workspace. Create logs/a.md with a concise result. Read it back. End with CHILD_A_DONE." -> delegate[model=dynamo/zai-org/GLM-4.7-Flash,output=child-b.md,outputMode=file-only] "Work only in the current workspace. Create logs/b.md with a concise result. Read it back. End with CHILD_B_DONE." -> delegate[model=dynamo/zai-org/GLM-4.7-Flash,output=child-c.md,outputMode=file-only] "Work only in the current workspace. Create logs/c.md with a concise result. Read it back. End with CHILD_C_DONE." ``` -Prefer `delegate` for lifecycle plumbing tests. Agents such as `worker`, -`planner`, and `oracle` may default to forked context; forked context is valid -but has persisted-session preconditions. If a lifecycle test does not need -forked context, avoid making that a second variable. +Prefer `delegate` for fresh-context lifecycle plumbing tests. + +## Forked Context + +Forked context is a first-class scenario: it is how agents such as `worker`, +`planner`, and `oracle` inherit the parent conversation. Start with one forked +child after the parent has completed at least one normal turn, and do not pass a +custom `--session-dir` while validating fork behavior. Let Pi persist sessions +where it normally does and collect the session paths from Pi/pi-subagents output +afterward. + +```text +/run worker[model=dynamo/zai-org/GLM-4.7-Flash,output=fork-worker.md,outputMode=file-only] "Use the inherited parent context. Work only in the current workspace. Create logs/fork-worker.md with one follow-up note, read it back, and end with CHILD_FORK_DONE." --fork +``` + +Once the single fork passes, scale to parallel forked children: + +```text +/parallel worker[model=dynamo/zai-org/GLM-4.7-Flash,output=fork-a.md,outputMode=file-only] "Use inherited context. Create logs/fork-a.md, read it back, end CHILD_FORK_A_DONE." -> worker[model=dynamo/zai-org/GLM-4.7-Flash,output=fork-b.md,outputMode=file-only] "Use inherited context. Create logs/fork-b.md, read it back, end CHILD_FORK_B_DONE." --fork +``` + +For Dynamo/SGLang, the important expectation is that forked children may share +the parent's prefix KV. The SGLang session-radix cache supports multiple +session ids on the same radix node; closing one child must remove only that +child's holder and must not free a node still held by the parent or another +forked child. After Pi reports the children complete, keep talking to the parent without subagents: @@ -108,7 +133,7 @@ Exit the Pi session with Ctrl-D. Collect the artifact paths in the final report: - Pi transcript: `${RUN_ROOT}/pi-terminal.typescript` -- Pi sessions: `${RUN_ROOT}/pi-sessions` +- Pi sessions: paths printed by Pi/pi-subagents in the transcript or child result - Dynamo trace: the `dynamo-request-trace.jsonl` path printed by the launcher - frontend and worker logs from the launcher run directory @@ -151,9 +176,9 @@ The lifecycle ordering to prove: use the Dynamo model. Add `model=dynamo/` to every subagent step or configure `subagents.agentOverrides`. - `Failed to create forked subagent session` means a forked-context child could - not branch from the parent session. Use `delegate` for fresh-context lifecycle - tests, avoid `--fork`, or remove custom session-dir variables while debugging - fork behavior. + not branch from the parent session. First retry with Pi's normal session + storage and no custom `--session-dir`; if it still fails, run + `/subagents-doctor` inside Pi and capture the exact parent/child session paths. - No `agent_context` or no trace rows means `DYN_REQUEST_TRACE=1` was missing from the Pi process environment. - No `release_session` after finality usually means Dynamo was not in From 361f03783ff35531c31bfb91043a1d3ddb6b0004 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 15 Jun 2026 15:26:49 +0000 Subject: [PATCH 6/6] fix: make pi finality close pings traceable --- scripts/launch-agg-agent.sh | 9 ++++++--- skills/pi-headless-dynamo/SKILL.md | 26 ++++++++++++++++++++------ src/dynamo-provider.ts | 7 ++++++- src/index.ts | 5 +++-- test/dynamo-provider.test.ts | 29 ++++++++++++++++++++++++++++- test/program-close.test.ts | 18 ++++++++++++++++-- 6 files changed, 79 insertions(+), 15 deletions(-) diff --git a/scripts/launch-agg-agent.sh b/scripts/launch-agg-agent.sh index f32b419..35f6e0c 100755 --- a/scripts/launch-agg-agent.sh +++ b/scripts/launch-agg-agent.sh @@ -21,7 +21,7 @@ usage() { Usage: scripts/launch-agg-agent.sh [OPTIONS] [-- SGLANG_ARGS...] Launch Dynamo's OpenAI-compatible frontend plus one SGLang worker for -GLM-4.7-Flash with agent tracing and Pi tool-event ingest enabled. +GLM-4.7-Flash with request tracing and Pi tool-event ingest enabled. This launcher uses file discovery, TCP request plane, and ZMQ event plane. It does not require NATS or etcd. @@ -46,6 +46,7 @@ Environment overrides: DYN_HTTP_PORT DYN_SYSTEM_PORT DYN_REQUEST_TRACE_OUTPUT_PATH + DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT Examples: scripts/launch-agg-agent.sh @@ -163,7 +164,7 @@ Pi environment for another shell: export DYNAMO_BASE_URL=http://127.0.0.1:${HTTP_PORT}/v1 export DYNAMO_API_KEY=dummy export DYN_REQUEST_TRACE=1 - export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=tcp://127.0.0.1:20390 + export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT=${DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT} Example Pi command: @@ -177,7 +178,7 @@ Perfetto conversion: cd ${DYNAMO_DIR} source .venv/bin/activate - python benchmarks/agent_trace/convert_to_perfetto.py \\ + python benchmarks/request_trace/convert_to_perfetto.py \\ ${TRACE_PATH} \\ --include-markers \\ --separate-stage-tracks \\ @@ -222,6 +223,7 @@ export DYN_REQUEST_TRACE=1 export DYN_REQUEST_TRACE_SINKS="${DYN_REQUEST_TRACE_SINKS:-jsonl}" export DYN_REQUEST_TRACE_OUTPUT_PATH="$TRACE_PATH" export DYN_REQUEST_TRACE_JSONL_FLUSH_INTERVAL_MS="${DYN_REQUEST_TRACE_JSONL_FLUSH_INTERVAL_MS:-100}" +export DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT="${DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT:-tcp://127.0.0.1:20390}" export DYN_LOG="${DYN_LOG:-info}" log "Run directory: $RUN_DIR" @@ -234,6 +236,7 @@ log "Request plane: tcp" log "Event plane: zmq" log "HTTP: http://127.0.0.1:$HTTP_PORT/v1" log "Trace JSONL: $TRACE_PATH" +log "Tool-event ingest: $DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT" log "Starting Dynamo frontend" python3 -m dynamo.frontend \ diff --git a/skills/pi-headless-dynamo/SKILL.md b/skills/pi-headless-dynamo/SKILL.md index 5ebb377..6a2395a 100644 --- a/skills/pi-headless-dynamo/SKILL.md +++ b/skills/pi-headless-dynamo/SKILL.md @@ -60,7 +60,8 @@ Control that process through its PTY like a user: - paste a full prompt or slash command as text; - send Enter to submit; - wait for Pi to finish before sending the next prompt; -- exit with Ctrl-D so root `trajectory_final` is emitted. +- type `/quit` and wait for process exit so Pi emits `session_shutdown` and + the root `trajectory_final`. Do not kill Pi to end a lifecycle run unless it is hung and the failure is the thing being tested. @@ -126,7 +127,8 @@ Then send one final parent-only turn: One final parent-only turn. Do not call subagent. Rank the top two artifacts for follow-up and give one reason each. End with PARENT_FINAL_OK. ``` -Exit the Pi session with Ctrl-D. +Exit the Pi session with `/quit`, then wait for the `script(1)` process to exit +with code 0. ## Verify Evidence @@ -144,10 +146,12 @@ TRACE_PATH=/path/from/launcher/dynamo-request-trace.jsonl FRONTEND_LOG=/path/from/launcher/logs/frontend.log WORKER_LOG=/path/from/launcher/logs/worker.log -rg -n "CHILD_.*_DONE|PARENT_AFTER_CHILDREN_OK|PARENT_FINAL_OK" "$RUN_ROOT/pi-terminal.typescript" "$RUN_ROOT/pi-sessions" +rg -n "CHILD_.*_DONE|PARENT_AFTER_CHILDREN_OK|PARENT_FINAL_OK" "$RUN_ROOT/pi-terminal.typescript" jq -s '{ events: length, + agent_context_rows: (map(select(.event.agent_context? != null)) | length), + trajectory_final_rows: (map(select(.event.agent_context.trajectory_final == true)) | length), output_tokens_total: (map(.event.request.output_tokens // 0) | add), input_lengths: { min: (map(.event.request.replay.input_length // 0) | min), @@ -170,6 +174,11 @@ The lifecycle ordering to prove: 3. Root trajectory closes only after Pi exits normally. 4. The server is stopped and GPUs return to baseline. +With Dynamo request-trace unification (#10701 and later), `agent_context` lives +on the same `dynamo.request.trace.v1` rows as request metrics. If trace rows are +present but `agent_context_rows` is zero, check that Pi had +`DYN_REQUEST_TRACE=1` and that the provider package was installed from this repo. + ## Troubleshooting - `401 "Invalid username or password."` in child sessions means a child did not @@ -179,8 +188,13 @@ The lifecycle ordering to prove: not branch from the parent session. First retry with Pi's normal session storage and no custom `--session-dir`; if it still fails, run `/subagents-doctor` inside Pi and capture the exact parent/child session paths. -- No `agent_context` or no trace rows means `DYN_REQUEST_TRACE=1` was missing - from the Pi process environment. +- No trace rows means Dynamo was not launched with `DYN_REQUEST_TRACE=1` or the + trace path points at the wrong run. +- Trace rows without `agent_context` usually mean Pi was launched without + `DYN_REQUEST_TRACE=1` or with a stale provider install. +- Zero `trajectory_final_rows` after a clean `/quit` usually means the provider + install is stale or the endpoint rejected the close ping; check `frontend.log` + for the final POST status. - No `release_session` after finality usually means Dynamo was not in `--router-mode kv`, the worker lacked `--enable-session-radix-cache`, or Pi - was killed instead of exited with Ctrl-D. + was killed instead of exited with `/quit`. diff --git a/src/dynamo-provider.ts b/src/dynamo-provider.ts index f72dddd..2aab234 100644 --- a/src/dynamo-provider.ts +++ b/src/dynamo-provider.ts @@ -346,6 +346,7 @@ export async function sendTrajectoryFinal( ): Promise { const agentContext = { ...buildDynamoAgentContext(config), trajectory_final: true }; if (!agentContext.trajectory_id) return false; + const finalModelId = modelId.trim() || DEFAULT_DYNAMO_MODEL_ID; try { const response = await fetchImpl(`${config.baseUrl}/chat/completions`, { method: "POST", @@ -355,7 +356,7 @@ export async function sendTrajectoryFinal( "x-request-id": createRequestId(), }, body: JSON.stringify({ - model: modelId, + model: finalModelId, messages: [{ role: "user", content: "." }], max_tokens: 1, stream: false, @@ -375,6 +376,10 @@ export function createDynamoStreamSimple( createRequestId: () => string = randomUUID, ): ProviderStreamSimple { return (model: Model, context: Context, options?: SimpleStreamOptions): AssistantMessageEventStream => { + const runtimeSessionId = options?.sessionId?.trim(); + if (!config.sessionId && runtimeSessionId) { + config.sessionId = runtimeSessionId; + } const openAIModel = toOpenAICompletionsModel(model); const headers = buildDynamoHeaders(options?.headers, createRequestId); const baseOptions: SimpleStreamOptions = { diff --git a/src/index.ts b/src/index.ts index e8dcec5..c53814a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,8 +31,10 @@ export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise const discoveredModels = await discoverDynamoModels(config); const models = discoveredModels.length > 0 ? discoveredModels : createDynamoModels([DEFAULT_DYNAMO_MODEL_ID], config.baseUrl); + const closeModelId = models.map((model) => model.id.trim()).find((id) => id.length > 0) ?? DEFAULT_DYNAMO_MODEL_ID; + const providerModels = models.map((model) => ({ ...model })); - pi.registerProvider(DYNAMO_PROVIDER_ID, createDynamoProviderConfig(config, models)); + pi.registerProvider(DYNAMO_PROVIDER_ID, createDynamoProviderConfig(config, providerModels)); if (config.traceEnabled) { await registerDynamoToolEventRelay(pi, config); } @@ -42,7 +44,6 @@ export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise // trajectory alive across reload/fork/new/resume flows. const programTrajectoryId = config.trajectoryId ?? config.sessionId; if (config.traceEnabled && programTrajectoryId) { - const closeModelId = models[0]?.id ?? DEFAULT_DYNAMO_MODEL_ID; let programClosed = false; const closeProgram = async (): Promise => { if (programClosed) return; diff --git a/test/dynamo-provider.test.ts b/test/dynamo-provider.test.ts index 71ec32d..b5694c8 100644 --- a/test/dynamo-provider.test.ts +++ b/test/dynamo-provider.test.ts @@ -13,6 +13,7 @@ import { DEFAULT_DYNAMO_BASE_URL, DEFAULT_DYNAMO_MODEL_ID, DEFAULT_SESSION_TYPE_ID, + type DynamoProviderRuntimeConfig, DYNAMO_API, mergeDynamoAgentContext, normalizeDynamoBaseUrl, @@ -287,9 +288,10 @@ describe("streamSimple wrapper", () => { it("delegates through openai-completions with injected payload and headers", async () => { let capturedModel: Model<"openai-completions"> | undefined; let capturedOptions: SimpleStreamOptions | undefined; + const runtimeConfig: DynamoProviderRuntimeConfig = { ...config }; const streamSimple = createDynamoStreamSimple( - config, + runtimeConfig, (openAIModel, _context, options) => { capturedModel = openAIModel; capturedOptions = options; @@ -311,6 +313,7 @@ describe("streamSimple wrapper", () => { expect(capturedModel?.api).toBe("openai-completions"); expect(capturedModel?.provider).toBe("dynamo"); + expect(runtimeConfig.sessionId).toBe("pi-session"); expect(capturedOptions?.apiKey).toBe("test-key"); expect(capturedOptions?.headers).toEqual({ "x-request-id": "request-1" }); expect(injectedPayload).toEqual({ @@ -410,6 +413,30 @@ describe("subagent trajectory context", () => { expect((calls[0]?.headers as Record)["x-request-id"]).toBe("close-req-1"); }); + it("reuses Pi's runtime session id for subagent trajectory_final", async () => { + const calls: Array<{ body: any }> = []; + const cfg = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1" }); + const streamSimple = createDynamoStreamSimple( + cfg, + (_model, _context, _options) => createAssistantMessageEventStream(), + () => "request-1", + ); + streamSimple(model, context, { sessionId: "pi-child-session" }); + + const fakeFetch = async (_url: string, init: RequestInit) => { + calls.push({ body: JSON.parse(String(init.body)) }); + return { ok: true, status: 200 }; + }; + + expect(await sendTrajectoryFinal(cfg, "zai-org/GLM-4.7-Flash", () => "close-req-1", fakeFetch)).toBe(true); + expect(calls[0]?.body.nvext.agent_context).toMatchObject({ + trajectory_id: "run-1:scout:3", + parent_trajectory_id: "orchestrator", + session_id: "pi-child-session", + trajectory_final: true, + }); + }); + it("streamSimple injects subagent agent_context without session_control", async () => { let capturedOptions: SimpleStreamOptions | undefined; const subagentConfig = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "run-1" }); diff --git a/test/program-close.test.ts b/test/program-close.test.ts index 7d12dfe..1178686 100644 --- a/test/program-close.test.ts +++ b/test/program-close.test.ts @@ -11,10 +11,12 @@ import dynamoProviderExtension from "../src/index.js"; type Handler = (event: any, ctx?: any) => unknown | Promise; -function makePi() { +function makePi(onRegisterProvider?: (providerConfig: any) => void) { const handlers: Record = {}; const pi = { - registerProvider: vi.fn(), + registerProvider: vi.fn((_id: string, providerConfig: any) => { + onRegisterProvider?.(providerConfig); + }), on: (event: string, handler: Handler) => { handlers[event] = handler; }, @@ -86,6 +88,18 @@ describe("program close (trajectory_final) — multiturn", () => { expect(ctx.trajectory_id).toBe("t-1"); expect(closeBodies[0].max_tokens).toBe(1); }); + + it("uses a stable discovered model id for the shutdown close ping", async () => { + const closeBodies = installFetch(); + const { pi, handlers } = makePi((providerConfig) => { + providerConfig.models[0].id = ""; + }); + await dynamoProviderExtension(pi as any); + await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); + + expect(closeBodies).toHaveLength(1); + expect(closeBodies[0].model).toBe("nvidia/MiniMax-M2.7-NVFP4"); + }); }); describe("subagent trajectory close", () => {