diff --git a/.github/workflows/integration-smoke.yml b/.github/workflows/integration-smoke.yml index 87b500c..c54ce1a 100644 --- a/.github/workflows/integration-smoke.yml +++ b/.github/workflows/integration-smoke.yml @@ -3,8 +3,8 @@ name: integration-smoke -# End-to-end check that nvext.agent_context fields emitted by this package -# round-trip through Dynamo's actual frontend + mocker into the agent trace +# End-to-end check that x-dynamo-trajectory-id emitted by this package +# round-trips through Dynamo's actual frontend + mocker into the request trace # sink. Builds Dynamo from ai-dynamo/dynamo@main on every run — published # wheels lag behind features (e.g. the agent_trace sink), so we need source # builds to test the surface this package actually depends on. Cargo cache @@ -59,6 +59,7 @@ jobs: with: node-version: "22" cache: "npm" + cache-dependency-path: pi-plugin/package-lock.json - name: Setup Python uses: actions/setup-python@v5 @@ -93,6 +94,7 @@ jobs: key: hf-tokenizer-${{ env.DYNAMO_TEST_MODEL_ID }} - name: Install npm dependencies + working-directory: pi-plugin run: npm ci - name: Install system build deps @@ -128,7 +130,7 @@ jobs: - name: Run integration smoke test env: SMOKE_KEEP_LOGS: "1" - run: ./scripts/integration-smoke.sh + run: ./pi-plugin/scripts/integration-smoke.sh - name: Upload trace JSONL on success if: success() diff --git a/CLAUDE.md b/CLAUDE.md index 45c4c38..c8e5403 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -3,28 +3,40 @@ SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All SPDX-License-Identifier: Apache-2.0 --> -# pi-dynamo-provider +# Dynamo agent plugins -Pi extension registering a `dynamo` provider for Dynamo's OpenAI-compatible chat-completions endpoint. Three source files in `src/` (~650 lines total): +Repo layout: -- `index.ts` — extension entrypoint; calls `readDynamoConfig`, discovers models via `/v1/models`, registers the provider, wires the tool-event relay. -- `dynamo-provider.ts` — config + agent_context construction + streamSimple wrapper. Reads `DYN_REQUEST_TRACE`, `DYN_AGENT_*`, and `PI_SUBAGENT_*` env vars. Gated by the `DYN_REQUEST_TRACE` master switch: when set, emits `nvext.agent_context` on every LLM request and sends `trajectory_final` at trajectory end; when unset, registers a plain `dynamo/` provider. -- `tool-relay.ts` — ZMQ PUSH publisher for Pi tool events. Connects to a Dynamo-bound PULL endpoint. Wire format: `[topic, seq_be_u64, msgpack(RequestTraceRecord)]`. +- `pi-plugin/` — Pi extension registering a `dynamo` provider for Dynamo's OpenAI-compatible chat-completions endpoint. +- `hermes-plugin/` — Hermes middleware plugin that injects Dynamo trajectory headers from Hermes `session_id`. + +The Pi plugin has three source files under `pi-plugin/src/`: + +- `index.ts` — thin re-export of the light implementation. +- `src/light/provider.ts` — config + streamSimple wrapper. Reads `DYN_REQUEST_TRACE`, `DYN_AGENT_*`, and `PI_SUBAGENT_*` env vars. When tracing is enabled, stamps `x-dynamo-trajectory-id` / parent headers and leaves Pi `sessionId` untouched. +- `src/light/tool-relay.ts` — ZMQ PUSH publisher for Pi tool events. Connects to a Dynamo-bound PULL endpoint. Wire format: `[topic, seq_be_u64, msgpack(RequestTraceRecord)]`. ## Build, test, check ```bash +cd pi-plugin npm install npm run check # tsc --noEmit (strict + exactOptionalPropertyTypes + noUncheckedIndexedAccess) npm test # vitest run npm run build # tsc -p tsconfig.build.json → dist/ ``` -Tests live in `test/` as siblings of `src/`. Use vitest's `describe`/`it`/`expect`. Mirror the existing structure: one test file per source file, fixture data inline rather than separate fixture files. +Pi tests live in `pi-plugin/test/` as siblings of `pi-plugin/src/`. Use vitest's `describe`/`it`/`expect`. Mirror the existing structure: one test file per source file, fixture data inline rather than separate fixture files. + +`pi-plugin/test/integration/smoke.mjs` is the out-of-band end-to-end check — driven by `pi-plugin/scripts/integration-smoke.sh`, not vitest. It boots Dynamo's frontend + mocker, sends one real chat completion, and asserts `x-dynamo-trajectory-id` becomes `trajectory_id` in the request trace JSONL. Two cases: top-level trajectory id and the pi-subagents bridge. Mocker output is garbage; assertions only target the trace envelope. CI clones `ai-dynamo/dynamo@main` and builds from source. Cargo cache keeps warm runs ~60-90s, cold ~10 min. `workflow_dispatch` accepts a `dynamo_ref` input for ad-hoc validation against a specific branch, tag, or SHA. -`test/integration/smoke.mjs` is the out-of-band end-to-end check — driven by `scripts/integration-smoke.sh`, not vitest. It boots Dynamo's frontend + mocker, sends one real chat completion, and asserts `nvext.agent_context` round-trips into the request trace JSONL. Two cases: top-level agent_context and the pi-subagents bridge. Mocker output is garbage; assertions only target the trace envelope. CI clones `ai-dynamo/dynamo@main` and builds from source. Cargo cache keeps warm runs ~60-90s, cold ~10 min. `workflow_dispatch` accepts a `dynamo_ref` input for ad-hoc validation against a specific branch, tag, or SHA. +For real Pi CLI lifecycle validation against a Dynamo endpoint, read `pi-plugin/skills/pi-headless-dynamo/SKILL.md` first and drive the actual interactive Pi TUI instead of faking provider requests or pi-subagents env. -For real Pi CLI lifecycle validation against a Dynamo endpoint, read `skills/pi-headless-dynamo/SKILL.md` first and drive the actual interactive Pi TUI instead of faking provider requests or pi-subagents env. +Hermes plugin validation: + +```bash +python3 -m unittest discover -s hermes-plugin/tests +``` ## Coding standards @@ -35,21 +47,21 @@ For real Pi CLI lifecycle validation against a Dynamo endpoint, read `skills/pi- - No emojis anywhere in code or comments. - Mermaid diagrams in markdown, not ASCII art. - Comments explain WHY, not WHAT. Read the bridge block in `readDynamoConfig` for the tone — it covers the non-obvious env-var inheritance behavior in a few lines. -- No new top-level exports unless they're part of the public surface; the package re-exports `dynamo-provider` and `tool-relay` from `index.ts`, that's the entire API. +- No new Pi top-level exports unless they're part of the public surface; `pi-plugin/src/index.ts` is the package API. ## Architecture invariants -- **One-way knowledge flow**: pi-dynamo-provider knows about pi-subagents' env contract (`PI_SUBAGENT_*` vars). pi-subagents never knows about us. Keep it that way — don't propose changes to pi-subagents to fix problems we can solve here. +- **One-way knowledge flow**: `pi-plugin` knows about pi-subagents' env contract (`PI_SUBAGENT_*` vars). pi-subagents never knows about us. Keep it that way — don't propose changes to pi-subagents to fix problems we can solve here. - **No `pi-mono` core patches**. Everything we want must be expressible through the public `ExtensionAPI` (`registerProvider`, `streamSimple` wrapper, tool-event hooks). If you find yourself wanting a Pi core change, the answer is almost always "find a different angle in this repo first." - **Dynamo owns the ZMQ bind side** for tool events. We're a PUSH connect-side producer. Don't try to bind. -- **Trace data is best-effort, not durable**. Don't add retry loops, persistent queues, or back-pressure that would block Pi. The `DynamoToolEventPublisher` drops events when its bounded queue is full; that's correct. +- **Trace data is best-effort, not durable**. Don't add retry loops, persistent queues, or back-pressure that would block Pi/Hermes. The Pi `DynamoToolEventPublisher` drops events when its bounded queue is full; that's correct. ## Env-var naming contract | Prefix | Direction | Examples | |---|---|---| | `DYNAMO_*` | client config (we read) | `DYNAMO_BASE_URL`, `DYNAMO_API_KEY` | -| `DYN_AGENT_*` | dynamo agent context (we read + emit) | `DYN_AGENT_SESSION_ID`, `DYN_AGENT_TRAJECTORY_ID` | +| `DYN_AGENT_*` | optional trajectory override / subagent parent link | `DYN_AGENT_TRAJECTORY_ID`, `DYN_AGENT_PARENT_TRAJECTORY_ID` | | `DYN_REQUEST_TRACE*` | request trace switch and tool bridge | `DYN_REQUEST_TRACE`, `DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT` | | `PI_SUBAGENT_*` | pi-subagents bookkeeping (we read only) | `PI_SUBAGENT_CHILD`, `PI_SUBAGENT_RUN_ID`, `PI_SUBAGENT_CHILD_AGENT`, `PI_SUBAGENT_CHILD_INDEX` | | `OPENAI_BASE_URL` | OpenAI-compatibility fallback (we read) | only consulted when `DYNAMO_BASE_URL` is unset | @@ -71,7 +83,6 @@ External contributions are not currently accepted. This is an NVIDIA-internal co ## What to leave alone -- The `nvext.agent_context` schema field names match ATIF (`session_type_id`, `session_id`, `trajectory_id`, `parent_trajectory_id`). Don't rename them — downstream tooling in Dynamo's converter and benchmark stack joins on these. -- The `phase: "reasoning"` field is deliberately hardcoded; it tags the LLM call as an agent reasoning step (vs. e.g. a synthesis or grading step). Adding other phase values requires Dynamo-side coordination. +- Dynamo owns the request trace schema. The Pi provider stamps trajectory headers for LLM requests and keeps explicit tool calls on the ZMQ trace path. The Hermes plugin only stamps request headers. - The `request.trace.v1` schema is owned upstream by Dynamo (`dynamo/lib/llm/src/request_trace/`). Don't change record shapes here without an upstream PR landing first. -- `package-lock.json` churn from npm version differences should be reverted before committing (`git checkout -- package-lock.json` if a no-op edit appears). +- `pi-plugin/package-lock.json` churn from npm version differences should be reverted before committing (`git checkout -- pi-plugin/package-lock.json` if a no-op edit appears). diff --git a/README.md b/README.md index b780534..0dd3877 100644 --- a/README.md +++ b/README.md @@ -1,166 +1,10 @@ -# pi-dynamo-provider +# Dynamo Agent Plugins -A Pi extension that registers a `dynamo` provider backed by [Dynamo](https://github.com/ai-dynamo/dynamo)'s OpenAI-compatible endpoint, so Pi can use Dynamo as a normal model: +Small agent integrations for Dynamo request tracing. -```bash -pi --model dynamo/ -``` +## Layout -With one switch (`DYN_REQUEST_TRACE=1`) it also tags every request for Dynamo's request trace, gives each pi-subagent its own trajectory id, and can relay Pi tool events into the trace — all without patching `pi-mono`. +- `pi-plugin/` - Pi provider plugin for Dynamo's OpenAI-compatible endpoint. +- `hermes-plugin/` - Hermes middleware plugin that maps Hermes `session_id` to `x-dynamo-trajectory-id`. -## What it does - -- **Model provider** — registers `dynamo`, discovers models from `/v1/models` (falls back to `dynamo/default`), and streams via Pi's OpenAI-compatible path. -- **Agent context** — injects `nvext.agent_context` (session/trajectory identity) so Dynamo can attribute each LLM request in its trace. -- **Trajectory-native KV release** — gives each [pi-subagents](https://github.com/nicobailon/pi-subagents) child its own `trajectory_id`; Dynamo/SGLang tag requests by that id and release it when the trajectory finishes. See [Trajectory-native KV release](#trajectory-native-kv-release). -- **Tool-event relay** — optionally pushes Pi `tool_start` / `tool_end` / `tool_error` events to Dynamo over ZMQ so one trace shows LLM spans and tool spans together. - -Everything but the bare model provider is gated by the `DYN_REQUEST_TRACE` master switch and is off by default. - -## Install - -```bash -# From this repo -pi install git:git@github.com:ai-dynamo/pi-dynamo-provider.git - -# Or from a local checkout (after `npm install && npm run build`) -pi install /absolute/path/to/pi-dynamo-provider - -# Or try it for a single run, no install -pi -e ./src/index.ts --model dynamo/ -``` - -## Quick start - -Point Pi at a running Dynamo endpoint: - -```bash -export DYNAMO_BASE_URL=http://127.0.0.1:8000/v1 -export DYNAMO_API_KEY=dummy # local Dynamo usually ignores this; defaults to dynamo-local -export DYN_REQUEST_TRACE=1 # opt into agent_context + trajectory finality - -pi --model dynamo/ -p "Reply exactly ok." -``` - -That's the whole required setup. Everything else (`session_type_id`, `trajectory_id`, `session_id`) has a sensible default and is only set when you want to override it — see [Configuration](#configuration). - -## Trajectory-native KV release - -Agentic runs spawn short-lived subagents that accumulate KV cache, use it for a few turns, then exit. Left in the shared radix tree, that ephemeral KV competes with the lead agent's long-lived prefix for eviction. Dynamo's session radix cache tags each request by `agent_context.trajectory_id` and bulk-releases that trajectory on `trajectory_final=true`. - -When `DYN_REQUEST_TRACE=1`, the provider drives that lifecycle through `nvext.agent_context`: - -```mermaid -sequenceDiagram - participant Root as Root pi process - participant Child as Subagent pi process - participant Dynamo - Root->>Dynamo: normal turn: trajectory_id = T_root - Child->>Dynamo: normal turn: trajectory_id = T_child
parent_trajectory_id = T_root - Child->>Dynamo: agent_end: trajectory_id = T_child
trajectory_final = true - Root->>Dynamo: quit: trajectory_id = T_root
trajectory_final = true -``` - -- The child `trajectory_id` is the subagent's own identity (`PI_SUBAGENT_RUN_ID:PI_SUBAGENT_CHILD_AGENT:PI_SUBAGENT_CHILD_INDEX`), so it needs no extra operator setup. -- `parent_trajectory_id` is lineage only: it is present in subagents and absent in the root. -- Subagent finality fires on `agent_end` (with `session_shutdown` as a backstop). Root finality fires only on `session_shutdown` reason `quit`. - -Requires a Dynamo frontend in `--router-mode kv` and an SGLang worker launched with `--enable-session-radix-cache`. Against any other backend the `agent_context` metadata remains trace-only. - -> The provider also links parent/child **trajectory ids** for tracing when `DYN_AGENT_TRAJECTORY_ID` is set on the root. See [Trajectory linking](#trajectory-linking). - -## Configuration - -The only thing you must set is the connection (`DYNAMO_BASE_URL`) and, to enable the agentic features, `DYN_REQUEST_TRACE`. Everything below is an optional override. - -| Variable | Default | Purpose | -| --- | --- | --- | -| `DYNAMO_BASE_URL` | `http://127.0.0.1:8000/v1` | Dynamo endpoint root (falls back to `OPENAI_BASE_URL`). | -| `DYNAMO_API_KEY` | `dynamo-local` | Bearer token. | -| `DYN_REQUEST_TRACE` | off | **Master switch.** When truthy (`1`/`true`/`yes`/`on`), enables `agent_context`, trajectory finality, and the tool relay. | -| `DYN_AGENT_SESSION_TYPE_ID` | `pi_coding_agent` | Session class in the trace. | -| `DYN_AGENT_SESSION_ID` | Pi session id | Top-level run id. | -| `DYN_AGENT_TRAJECTORY_ID` | Pi session id | Trajectory id; also enables parent/child [trajectory linking](#trajectory-linking) for subagents. | -| `DYN_AGENT_PARENT_TRAJECTORY_ID` | unset | Parent trajectory; set manually to override the bridge. | -| `DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT` | unset | Dynamo-bound ZMQ PULL endpoint for the tool relay. | - -`PI_SUBAGENT_CHILD` / `PI_SUBAGENT_RUN_ID` / `PI_SUBAGENT_CHILD_AGENT` / `PI_SUBAGENT_CHILD_INDEX` are **read, never set** — pi-subagents populates them and the provider uses them to derive the child `trajectory_id` and parent link. - -
-Injected request metadata - -With `DYN_REQUEST_TRACE` on, each request payload gets: - -```json -{ - "nvext": { - "agent_context": { - "session_type_id": "pi_coding_agent", - "session_id": "", - "trajectory_id": "", - "phase": "reasoning" - } - } -} -``` - -Existing `nvext` fields are preserved, and `x-request-id` is added when absent. Subagent requests include `parent_trajectory_id`; final requests also include `trajectory_final: true`. -
- -
-Tool-event wire format - -When a tool-event endpoint is set, Pi connects a ZMQ PUSH socket and sends one multipart message per event: - -```text -[topic, seq_be_u64, msgpack(RequestTraceRecord)] -``` - -The record uses Dynamo's `dynamo.request.trace.v1` schema (`event_type`, `event_source`, `agent_context`, and a `tool` object with timing/status). Dynamo owns the PULL bind side, so multiple Pi processes and subagents can all connect as producers. Terminal `tool_end` / `tool_error` records are self-contained. -
- -## Trajectory linking - -The provider keeps parent and child trajectory ids distinct. When a pi-subagents child inherits the parent's `DYN_AGENT_TRAJECTORY_ID`, the provider reinterprets it as the child's `parent_trajectory_id` and synthesizes a fresh child `trajectory_id` (`runId:childAgent:childIndex`), mutating `process.env` so nested chains stay attributable. Setting `DYN_AGENT_PARENT_TRAJECTORY_ID` manually overrides the parent link. If you don't set `DYN_AGENT_TRAJECTORY_ID` at all, every subagent still gets its own child trajectory id — only the explicit parent→child link is absent. - -## Local Dynamo - -Two helper scripts onboard a local Dynamo for testing: - -```bash -./scripts/install-dynamo.sh # clone + build Dynamo into a cache dir via uv + maturin -./scripts/launch-agg-agent.sh # serve GLM-4.7-Flash: one frontend + one SGLang worker -``` - -`launch-agg-agent.sh` uses file discovery + TCP + ZMQ (no NATS/etcd), enables session radix cache and JSONL tracing, and prints the exact Pi env to use. Common overrides: - -```bash -./scripts/launch-agg-agent.sh --gpu 1 # different single GPU -./scripts/launch-agg-agent.sh --gpu 0,1 --tp 2 # one worker across two GPUs -./scripts/launch-agg-agent.sh -- --disable-cuda-graph # forward flags to dynamo.sglang -``` - -> Trajectory-native release additionally needs `--router-mode kv` on the frontend so Dynamo can route the internal close to the worker that owns the tag. - -## Development - -```bash -npm install -npm run check # tsc --noEmit (strict) -npm run test # vitest -npm run build # -> dist/ -``` - -`scripts/integration-smoke.sh` boots Dynamo's frontend + mocker and asserts the `nvext` envelope round-trips into the trace; it is the out-of-band end-to-end check. - -## Troubleshooting - -- **`/v1/models` empty** — wait for the backend to load; confirm frontend and worker share the same discovery/request/event planes and `DYN_FILE_KV`. -- **Model unknown** — `curl "$DYNAMO_BASE_URL/models"` and use the returned id as `dynamo/`; restart Pi if discovery failed before Dynamo was ready. -- **No agent_context / 400 on requests** — make sure `DYN_REQUEST_TRACE` is set; the provider injects nothing without it. -- **Tool spans missing** — set a tool-event endpoint on both sides and confirm the run actually used tools. -- **No trajectory release** — needs `DYN_REQUEST_TRACE=1`, `--router-mode kv`, and a worker with `--enable-session-radix-cache`. - -## Scope - -No `pi-mono` core changes, no native Rust ABI, no Dynamo launch management beyond the helper scripts. The `nvext` and `request.trace.v1` schemas are owned upstream by Dynamo. +Each plugin owns its own tests and install instructions. diff --git a/hermes-plugin/README.md b/hermes-plugin/README.md new file mode 100644 index 0000000..898a8c1 --- /dev/null +++ b/hermes-plugin/README.md @@ -0,0 +1,16 @@ +# Hermes Dynamo Trajectory Plugin + +Hermes middleware plugin that copies the current Hermes `session_id` into Dynamo's `x-dynamo-trajectory-id` request header. + +## Install + +```bash +hermes plugins install /absolute/path/to/repo/hermes-plugin +hermes plugins enable dynamo_trajectory +``` + +## Validate + +```bash +python3 -m unittest discover -s hermes-plugin/tests +``` diff --git a/hermes-plugin/__init__.py b/hermes-plugin/__init__.py new file mode 100644 index 0000000..d1a76da --- /dev/null +++ b/hermes-plugin/__init__.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Inject Hermes session IDs as Dynamo trajectory headers.""" + +HEADER = "x-dynamo-trajectory-id" + + +def register(ctx) -> None: + ctx.register_middleware("llm_request", add_dynamo_trajectory_header) + + +def add_dynamo_trajectory_header(**kwargs): + session_id = str(kwargs.get("session_id") or "").strip() + if not session_id: + return None + + request = dict(kwargs.get("request") or {}) + raw_headers = request.get("extra_headers") + headers = dict(raw_headers) if isinstance(raw_headers, dict) else {} + headers.setdefault(HEADER, session_id) + request["extra_headers"] = headers + return {"request": request} diff --git a/hermes-plugin/plugin.yaml b/hermes-plugin/plugin.yaml new file mode 100644 index 0000000..306362f --- /dev/null +++ b/hermes-plugin/plugin.yaml @@ -0,0 +1,4 @@ +name: dynamo_trajectory +version: "0.1.0" +description: "Optional Dynamo trajectory header injection for Hermes." +author: NVIDIA diff --git a/hermes-plugin/tests/test_dynamo_trajectory_plugin.py b/hermes-plugin/tests/test_dynamo_trajectory_plugin.py new file mode 100644 index 0000000..3567f9b --- /dev/null +++ b/hermes-plugin/tests/test_dynamo_trajectory_plugin.py @@ -0,0 +1,71 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import importlib.util +import pathlib +import types +import unittest + + +PLUGIN_PATH = pathlib.Path(__file__).resolve().parents[1] / "__init__.py" + + +def load_plugin(): + spec = importlib.util.spec_from_file_location("dynamo_trajectory_plugin", PLUGIN_PATH) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +class DynamoTrajectoryPluginTest(unittest.TestCase): + def test_adds_session_id_as_dynamo_trajectory_header(self): + plugin = load_plugin() + + result = plugin.add_dynamo_trajectory_header( + session_id="hermes-session", + request={"model": "qwen", "extra_headers": {"x-test": "1"}}, + ) + + self.assertEqual( + result, + { + "request": { + "model": "qwen", + "extra_headers": { + "x-test": "1", + "x-dynamo-trajectory-id": "hermes-session", + }, + } + }, + ) + + def test_preserves_explicit_dynamo_trajectory_header(self): + plugin = load_plugin() + + result = plugin.add_dynamo_trajectory_header( + session_id="hermes-session", + request={"extra_headers": {"x-dynamo-trajectory-id": "explicit"}}, + ) + + self.assertEqual(result["request"]["extra_headers"]["x-dynamo-trajectory-id"], "explicit") + + def test_skips_without_session_id(self): + plugin = load_plugin() + + self.assertIsNone(plugin.add_dynamo_trajectory_header(request={"model": "qwen"})) + + def test_registers_llm_request_middleware(self): + plugin = load_plugin() + calls = [] + ctx = types.SimpleNamespace( + register_middleware=lambda kind, callback: calls.append((kind, callback)) + ) + + plugin.register(ctx) + + self.assertEqual(calls, [("llm_request", plugin.add_dynamo_trajectory_header)]) + + +if __name__ == "__main__": + unittest.main() diff --git a/pi-plugin/README.md b/pi-plugin/README.md new file mode 100644 index 0000000..c43d5ac --- /dev/null +++ b/pi-plugin/README.md @@ -0,0 +1,136 @@ +# pi-dynamo-provider + +A Pi extension that registers a `dynamo` provider backed by [Dynamo](https://github.com/ai-dynamo/dynamo)'s OpenAI-compatible endpoint, so Pi can use Dynamo as a normal model: + +```bash +pi --model dynamo/ +``` + +With one switch (`DYN_REQUEST_TRACE=1`) it also stamps Dynamo trajectory headers, gives each pi-subagent its own trajectory id, and can relay Pi tool events into the trace — all without patching `pi-mono`. + +## What it does + +- **Model provider** — registers `dynamo`, discovers models from `/v1/models` (falls back to `dynamo/default`), and streams via Pi's OpenAI-compatible path. +- **Trajectory headers** — adds `x-dynamo-trajectory-id` and optional parent headers so Dynamo can attribute each LLM request as a trajectory in its trace. +- **Subagent trajectory ids** — gives each [pi-subagents](https://github.com/nicobailon/pi-subagents) child its own trajectory id. See [Subagent trajectory ids](#subagent-trajectory-ids). +- **Tool-event relay** — optionally pushes Pi `tool_start` / `tool_end` / `tool_error` events to Dynamo over ZMQ so one trace shows LLM spans and tool spans together. + +Everything but the bare model provider is gated by the `DYN_REQUEST_TRACE` master switch and is off by default. + +## Install + +```bash +# From a local checkout, after `npm install && npm run build` +pi install /absolute/path/to/pi-dynamo-provider/pi-plugin + +# Or try it for a single run, no install +cd pi-plugin +pi -e ./src/index.ts --model dynamo/ +``` + +## Quick start + +Point Pi at a running Dynamo endpoint: + +```bash +export DYNAMO_BASE_URL=http://127.0.0.1:8000/v1 +export DYNAMO_API_KEY=dummy # local Dynamo usually ignores this; defaults to dynamo-local +export DYN_REQUEST_TRACE=1 # opt into trajectory tracing + optional tool relay + +pi --model dynamo/ -p "Reply exactly ok." +``` + +That's the whole required setup. Everything else is only set when you want to override it — see [Configuration](#configuration). + +## Subagent trajectory ids + +When `DYN_REQUEST_TRACE=1`, the provider preserves Pi's normal `sessionId` and adds explicit Dynamo trajectory headers. + +```mermaid +sequenceDiagram + participant Root as Root pi process + participant Child as Subagent pi process + participant Dynamo + Root->>Dynamo: x-dynamo-trajectory-id = S_root + Child->>Dynamo: x-dynamo-trajectory-id = T_child, parent = S_root +``` + +- The root `trajectory_id` is Pi's own `sessionId`. +- The child `trajectory_id` is the subagent's own identity (`PI_SUBAGENT_RUN_ID:PI_SUBAGENT_CHILD_AGENT:PI_SUBAGENT_CHILD_INDEX`), so it needs no extra operator setup. +- The provider sends those values as `x-dynamo-trajectory-id` and `x-dynamo-parent-trajectory-id`. + +> ZMQ tool records can include parent/child **trajectory ids** when `DYN_AGENT_TRAJECTORY_ID` is set on the root. See [Trajectory linking](#trajectory-linking). + +## Configuration + +The only thing you must set is the connection (`DYNAMO_BASE_URL`) and, to enable the agentic features, `DYN_REQUEST_TRACE`. Everything below is an optional override. + +| Variable | Default | Purpose | +| --- | --- | --- | +| `DYNAMO_BASE_URL` | `http://127.0.0.1:8000/v1` | Dynamo endpoint root (falls back to `OPENAI_BASE_URL`). | +| `DYNAMO_API_KEY` | `dynamo-local` | Bearer token. | +| `DYN_REQUEST_TRACE` | off | **Master switch.** When truthy (`1`/`true`/`yes`/`on`), enables Dynamo trajectory headers and the tool relay. | +| `DYN_AGENT_TRAJECTORY_ID` | unset | Optional parent trajectory seed for [trajectory linking](#trajectory-linking) in subagents. | +| `DYN_AGENT_PARENT_TRAJECTORY_ID` | unset | Parent trajectory; set manually to override the bridge. | +| `DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT` | unset | Dynamo-bound ZMQ PULL endpoint for the tool relay. | + +`PI_SUBAGENT_CHILD` / `PI_SUBAGENT_RUN_ID` / `PI_SUBAGENT_CHILD_AGENT` / `PI_SUBAGENT_CHILD_INDEX` are **read, never set** — pi-subagents populates them and the provider uses them to derive the child `trajectory_id` and parent link. + +With `DYN_REQUEST_TRACE` on, the provider does not mutate request payloads. It adds Dynamo trajectory headers and `x-request-id` when absent. + +
+Tool-event wire format + +When a tool-event endpoint is set, Pi connects a ZMQ PUSH socket and sends one multipart message per event: + +```text +[topic, seq_be_u64, msgpack(RequestTraceRecord)] +``` + +The record uses Dynamo's `dynamo.request.trace.v1` schema (`event_type`, `event_source`, `agent_context`, and a `tool` object with timing/status). Dynamo owns the PULL bind side, so multiple Pi processes and subagents can all connect as producers. Terminal `tool_end` / `tool_error` records are self-contained. +
+ +## Trajectory linking + +The provider keeps parent and child trajectory ids distinct for ZMQ tool records. When a pi-subagents child inherits the parent's `DYN_AGENT_TRAJECTORY_ID`, the provider reinterprets it as the child's `parent_trajectory_id` and synthesizes a fresh child `trajectory_id` (`runId:childAgent:childIndex`), mutating `process.env` so nested chains stay attributable. Setting `DYN_AGENT_PARENT_TRAJECTORY_ID` manually overrides the parent link. If you don't set `DYN_AGENT_TRAJECTORY_ID` at all, every subagent still gets its own child trajectory id — only the explicit parent-to-child link is absent. + +## Local Dynamo + +Two helper scripts onboard a local Dynamo for testing: + +```bash +cd pi-plugin +./scripts/install-dynamo.sh # clone + build Dynamo into a cache dir via uv + maturin +./scripts/launch-agg-agent.sh # serve GLM-4.7-Flash: one frontend + one SGLang worker +``` + +`launch-agg-agent.sh` uses file discovery + TCP + ZMQ (no NATS/etcd), enables JSONL tracing, and prints the exact Pi env to use. Common overrides: + +```bash +cd pi-plugin +./scripts/launch-agg-agent.sh --gpu 1 # different single GPU +./scripts/launch-agg-agent.sh --gpu 0,1 --tp 2 # one worker across two GPUs +./scripts/launch-agg-agent.sh -- --disable-cuda-graph # forward flags to dynamo.sglang +``` + +## Development + +```bash +npm install +npm run check # tsc --noEmit (strict) +npm run test # vitest +npm run build # -> dist/ +``` + +`scripts/integration-smoke.sh` boots Dynamo's frontend + mocker and asserts `x-dynamo-trajectory-id` becomes `trajectory_id` in the trace; it is the out-of-band end-to-end check. + +## Troubleshooting + +- **`/v1/models` empty** — wait for the backend to load; confirm frontend and worker share the same discovery/request/event planes and `DYN_FILE_KV`. +- **Model unknown** — `curl "$DYNAMO_BASE_URL/models"` and use the returned id as `dynamo/`; restart Pi if discovery failed before Dynamo was ready. +- **No agent_context in trace rows** — make sure `DYN_REQUEST_TRACE` is set and Dynamo is new enough to map `x-dynamo-trajectory-id`. +- **Tool spans missing** — set a tool-event endpoint on both sides and confirm the run actually used tools. + +## Scope + +No `pi-mono` core changes, no native Rust ABI, no Dynamo launch management beyond the helper scripts. The request trace schema is owned upstream by Dynamo. diff --git a/package-lock.json b/pi-plugin/package-lock.json similarity index 100% rename from package-lock.json rename to pi-plugin/package-lock.json diff --git a/package.json b/pi-plugin/package.json similarity index 94% rename from package.json rename to pi-plugin/package.json index 74957c4..d48d095 100644 --- a/package.json +++ b/pi-plugin/package.json @@ -1,7 +1,7 @@ { "name": "pi-dynamo-provider", "version": "0.1.0", - "description": "Pi extension package that registers a Dynamo OpenAI-compatible provider with agent-context tracing metadata.", + "description": "Pi extension package that registers a Dynamo OpenAI-compatible provider with trajectory headers.", "type": "module", "main": "./dist/index.js", "types": "./dist/index.d.ts", diff --git a/scripts/install-dynamo.sh b/pi-plugin/scripts/install-dynamo.sh similarity index 100% rename from scripts/install-dynamo.sh rename to pi-plugin/scripts/install-dynamo.sh diff --git a/scripts/integration-smoke.sh b/pi-plugin/scripts/integration-smoke.sh similarity index 95% rename from scripts/integration-smoke.sh rename to pi-plugin/scripts/integration-smoke.sh index 8e615bf..ed50f29 100755 --- a/scripts/integration-smoke.sh +++ b/pi-plugin/scripts/integration-smoke.sh @@ -3,8 +3,8 @@ # SPDX-License-Identifier: Apache-2.0 # # Start a Dynamo frontend + mocker worker against a known-good Dynamo version, -# then run test/integration/smoke.mjs which asserts that nvext.agent_context -# round-trips end-to-end through the trace sink. Tears down processes on exit. +# then run test/integration/smoke.mjs which asserts that x-dynamo-trajectory-id +# becomes trajectory identity in the trace sink. Tears down processes on exit. # # Required env: # DYNAMO_TEST_MODEL_ID HuggingFace model id for the mocker tokenizer @@ -68,7 +68,7 @@ wait_for_http() { echo "smoke: trace dir = ${TRACE_DIR}" -# Build the provider so the smoke test can import dist/dynamo-provider.js. +# Build the provider so the smoke test can import dist/light/provider.js. echo "smoke: building pi-dynamo-provider" (cd "${REPO_ROOT}" && npm run build >/dev/null) diff --git a/scripts/launch-agg-agent.sh b/pi-plugin/scripts/launch-agg-agent.sh similarity index 99% rename from scripts/launch-agg-agent.sh rename to pi-plugin/scripts/launch-agg-agent.sh index 35f6e0c..5b73096 100755 --- a/scripts/launch-agg-agent.sh +++ b/pi-plugin/scripts/launch-agg-agent.sh @@ -181,7 +181,6 @@ Perfetto conversion: python benchmarks/request_trace/convert_to_perfetto.py \\ ${TRACE_PATH} \\ --include-markers \\ - --separate-stage-tracks \\ --output ${RUN_DIR}/dynamo-request-trace.perfetto.json EOF diff --git a/skills/pi-headless-dynamo/SKILL.md b/pi-plugin/skills/pi-headless-dynamo/SKILL.md similarity index 81% rename from skills/pi-headless-dynamo/SKILL.md rename to pi-plugin/skills/pi-headless-dynamo/SKILL.md index 6a2395a..990d691 100644 --- a/skills/pi-headless-dynamo/SKILL.md +++ b/pi-plugin/skills/pi-headless-dynamo/SKILL.md @@ -1,6 +1,6 @@ --- name: pi-headless-dynamo -description: Drive the real Pi CLI headlessly against a Dynamo or OpenAI-compatible endpoint for pi-dynamo-provider validation. Use when testing Pi provider installs, agent_context tracing, trajectory-native lifecycle release, Pi subagent runs, saved traces, or parent/child session behavior without manually faking Pi or pi-subagents internals. +description: Drive the real Pi CLI headlessly against a Dynamo or OpenAI-compatible endpoint for pi-dynamo-provider validation. Use when testing Pi provider installs, trajectory header tracing, Pi subagent runs, saved traces, or parent/child trajectory behavior without manually faking Pi or pi-subagents internals. --- # Pi Headless Dynamo @@ -20,7 +20,7 @@ directly to stand in for Pi, or patch pi-subagents while validating this repo. Use a running Dynamo endpoint or start one with the repo launcher: ```bash -scripts/launch-agg-agent.sh --dynamo-dir /ephemeral/dynamo-radix-native --gpu 0,1 --tp 2 --http-port 18083 --system-port 18084 +pi-plugin/scripts/launch-agg-agent.sh --dynamo-dir /ephemeral/dynamo-radix-native --gpu 0,1 --tp 2 --http-port 18083 --system-port 18084 ``` Before launching Pi, verify the endpoint and model: @@ -60,16 +60,12 @@ Control that process through its PTY like a user: - paste a full prompt or slash command as text; - send Enter to submit; - wait for Pi to finish before sending the next prompt; -- type `/quit` and wait for process exit so Pi emits `session_shutdown` and - the root `trajectory_final`. +- type `/quit` and wait for process exit so Pi shuts down cleanly. Do not kill Pi to end a lifecycle run unless it is hung and the failure is the thing being tested. -`DYN_AGENT_SESSION_TYPE_ID` and `DYN_AGENT_SESSION_ID` are optional labels. The -provider defaults `session_type_id` to `pi_coding_agent`; normal LLM requests -use Pi's own session id when `DYN_AGENT_SESSION_ID` is absent. Set them only -when a run needs stable, human-chosen trace labels. +When `DYN_REQUEST_TRACE=1`, the provider stamps `x-dynamo-trajectory-id` on LLM requests. Normal root turns use Pi's own session id as the trajectory id; pi-subagents children derive their id from `PI_SUBAGENT_*`. ## Drive A Lifecycle Run @@ -151,7 +147,6 @@ rg -n "CHILD_.*_DONE|PARENT_AFTER_CHILDREN_OK|PARENT_FINAL_OK" "$RUN_ROOT/pi-ter jq -s '{ events: length, agent_context_rows: (map(select(.event.agent_context? != null)) | length), - trajectory_final_rows: (map(select(.event.agent_context.trajectory_final == true)) | length), output_tokens_total: (map(.event.request.output_tokens // 0) | add), input_lengths: { min: (map(.event.request.replay.input_length // 0) | min), @@ -169,14 +164,13 @@ nvidia-smi --query-gpu=index,name,memory.used,memory.total --format=csv,noheader The lifecycle ordering to prove: -1. Child trajectories close and release first. -2. Parent-only turns still run after child release. -3. Root trajectory closes only after Pi exits normally. -4. The server is stopped and GPUs return to baseline. +1. Child LLM requests carry child trajectory ids. +2. Parent-only turns still carry the parent trajectory id. +3. The server is stopped and GPUs return to baseline. -With Dynamo request-trace unification (#10701 and later), `agent_context` lives -on the same `dynamo.request.trace.v1` rows as request metrics. If trace rows are -present but `agent_context_rows` is zero, check that Pi had +With Dynamo request-trace unification (#10701 and later), trajectory identity +lives on the same `dynamo.request.trace.v1` rows as request metrics. If trace +rows are present but `agent_context_rows` is zero, check that Pi had `DYN_REQUEST_TRACE=1` and that the provider package was installed from this repo. ## Troubleshooting @@ -192,9 +186,3 @@ present but `agent_context_rows` is zero, check that Pi had trace path points at the wrong run. - Trace rows without `agent_context` usually mean Pi was launched without `DYN_REQUEST_TRACE=1` or with a stale provider install. -- Zero `trajectory_final_rows` after a clean `/quit` usually means the provider - install is stale or the endpoint rejected the close ping; check `frontend.log` - for the final POST status. -- No `release_session` after finality usually means Dynamo was not in - `--router-mode kv`, the worker lacked `--enable-session-radix-cache`, or Pi - was killed instead of exited with `/quit`. diff --git a/skills/pi-headless-dynamo/agents/openai.yaml b/pi-plugin/skills/pi-headless-dynamo/agents/openai.yaml similarity index 100% rename from skills/pi-headless-dynamo/agents/openai.yaml rename to pi-plugin/skills/pi-headless-dynamo/agents/openai.yaml diff --git a/pi-plugin/src/index.ts b/pi-plugin/src/index.ts new file mode 100644 index 0000000..c6daf56 --- /dev/null +++ b/pi-plugin/src/index.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +export { default } from "./light/index.js"; +export * from "./light/index.js"; diff --git a/pi-plugin/src/light/index.ts b/pi-plugin/src/light/index.ts new file mode 100644 index 0000000..2a9b5ea --- /dev/null +++ b/pi-plugin/src/light/index.ts @@ -0,0 +1,28 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import { + createDynamoModels, + createDynamoProviderConfig, + DEFAULT_DYNAMO_MODEL_ID, + DYNAMO_PROVIDER_ID, + discoverDynamoModels, + readDynamoConfig, +} from "./provider.js"; +import { registerDynamoToolEventRelay } from "./tool-relay.js"; +import { applySubagentTrajectoryBridge } from "./trajectory.js"; + +export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise { + applySubagentTrajectoryBridge(); + const config = readDynamoConfig(); + const discoveredModels = await discoverDynamoModels(config); + const models = + discoveredModels.length > 0 ? discoveredModels : createDynamoModels([DEFAULT_DYNAMO_MODEL_ID], config.baseUrl); + pi.registerProvider(DYNAMO_PROVIDER_ID, createDynamoProviderConfig(config, models.map((model) => ({ ...model })))); + await registerDynamoToolEventRelay(pi, config); +} + +export * from "./provider.js"; +export * from "./tool-relay.js"; +export * from "./trajectory.js"; diff --git a/pi-plugin/src/light/provider.ts b/pi-plugin/src/light/provider.ts new file mode 100644 index 0000000..7349a27 --- /dev/null +++ b/pi-plugin/src/light/provider.ts @@ -0,0 +1,183 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { randomUUID } from "node:crypto"; +import { streamSimpleOpenAICompletions } from "@mariozechner/pi-ai"; +import type { + Api, + AssistantMessageEventStream, + Context, + Model, + OpenAICompletionsCompat, + SimpleStreamOptions, +} from "@mariozechner/pi-ai"; +import type { ProviderConfig, ProviderModelConfig } from "@mariozechner/pi-coding-agent"; +import { + envValue, + isTruthyEnv, + resolveTrajectoryContext, + type DynamoTrajectoryEnvironment, +} from "./trajectory.js"; + +export const DYNAMO_PROVIDER_ID = "dynamo"; +export const DYNAMO_API = "dynamo-openai-completions" satisfies Api; +export const DEFAULT_DYNAMO_BASE_URL = "http://127.0.0.1:8000/v1"; +export const DEFAULT_DYNAMO_API_KEY = "dynamo-local"; +export const DEFAULT_DYNAMO_MODEL_ID = "default"; + +export interface DynamoEnvironment extends DynamoTrajectoryEnvironment { + DYNAMO_BASE_URL?: string; + OPENAI_BASE_URL?: string; + DYNAMO_API_KEY?: string; +} + +export interface DynamoConfig { + baseUrl: string; + apiKey: string; + traceEnabled: boolean; + trajectoryId?: string; + parentTrajectoryId?: string; +} + +interface OpenAIModelsResponse { + data?: Array<{ id?: unknown }>; +} + +type OpenAICompletionsStreamSimple = ( + model: Model<"openai-completions">, + context: Context, + options?: SimpleStreamOptions, +) => AssistantMessageEventStream; + +type ProviderStreamSimple = NonNullable; + +export function normalizeDynamoBaseUrl(rawBaseUrl: string | undefined): string { + const raw = rawBaseUrl?.trim() || DEFAULT_DYNAMO_BASE_URL; + const withoutTrailingSlash = raw.replace(/\/+$/, ""); + try { + const url = new URL(withoutTrailingSlash); + if (url.pathname === "" || url.pathname === "/") url.pathname = "/v1"; + return url.toString().replace(/\/+$/, ""); + } catch { + return withoutTrailingSlash; + } +} + +export function readDynamoConfig(env: DynamoEnvironment = process.env): DynamoConfig { + const trajectory = resolveTrajectoryContext(env); + return { + baseUrl: normalizeDynamoBaseUrl(envValue(env, "DYNAMO_BASE_URL") ?? envValue(env, "OPENAI_BASE_URL")), + apiKey: envValue(env, "DYNAMO_API_KEY") ?? DEFAULT_DYNAMO_API_KEY, + traceEnabled: isTruthyEnv(envValue(env, "DYN_REQUEST_TRACE")), + ...(trajectory.trajectoryId ? { trajectoryId: trajectory.trajectoryId } : {}), + ...(trajectory.parentTrajectoryId ? { parentTrajectoryId: trajectory.parentTrajectoryId } : {}), + }; +} + +function hasHeader(headers: Record, target: string): boolean { + const normalizedTarget = target.toLowerCase(); + return Object.keys(headers).some((key) => key.toLowerCase() === normalizedTarget); +} + +export function buildDynamoHeaders( + headers: Record | undefined, + config: Pick, + runtimeSessionId: string | undefined, + createRequestId: () => string = randomUUID, +): Record { + const nextHeaders = { ...headers }; + if (!hasHeader(nextHeaders, "x-request-id")) nextHeaders["x-request-id"] = createRequestId(); + if (!config.traceEnabled) return nextHeaders; + + const trajectoryId = config.trajectoryId ?? runtimeSessionId; + if (trajectoryId && !hasHeader(nextHeaders, "x-dynamo-trajectory-id")) { + nextHeaders["x-dynamo-trajectory-id"] = trajectoryId; + } + if (config.parentTrajectoryId && !hasHeader(nextHeaders, "x-dynamo-parent-trajectory-id")) { + nextHeaders["x-dynamo-parent-trajectory-id"] = config.parentTrajectoryId; + } + return nextHeaders; +} + +const dynamoOpenAICompat = { + supportsStore: false, + supportsDeveloperRole: false, + supportsReasoningEffort: false, + supportsUsageInStreaming: true, + maxTokensField: "max_tokens", + supportsStrictMode: false, + supportsLongCacheRetention: false, + sendSessionAffinityHeaders: true, +} satisfies OpenAICompletionsCompat; + +export function createDynamoModels(modelIds: string[], baseUrl: string): ProviderModelConfig[] { + const ids = modelIds.length > 0 ? modelIds : [DEFAULT_DYNAMO_MODEL_ID]; + return ids.map((id) => ({ + id, + name: id, + api: DYNAMO_API, + baseUrl, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128000, + maxTokens: 8192, + compat: dynamoOpenAICompat, + })); +} + +export async function discoverDynamoModels(config: DynamoConfig, timeoutMs = 2000): Promise { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), timeoutMs); + try { + const response = await fetch(`${config.baseUrl}/models`, { + headers: { Authorization: `Bearer ${config.apiKey}` }, + signal: controller.signal, + }); + if (!response.ok) return []; + const body = (await response.json()) as OpenAIModelsResponse; + const ids = + body.data?.map((model) => model.id).filter((id): id is string => typeof id === "string" && id.length > 0) ?? + []; + return createDynamoModels([...new Set(ids)], config.baseUrl); + } catch { + return []; + } finally { + clearTimeout(timeout); + } +} + +function toOpenAICompletionsModel(model: Model): Model<"openai-completions"> { + const { api: _api, compat, ...rest } = model; + return { + ...rest, + api: "openai-completions", + compat: (compat as OpenAICompletionsCompat | undefined) ?? dynamoOpenAICompat, + }; +} + +export function createDynamoStreamSimple( + config: DynamoConfig, + delegate: OpenAICompletionsStreamSimple = streamSimpleOpenAICompletions, + createRequestId: () => string = randomUUID, +): ProviderStreamSimple { + return (model: Model, context: Context, options?: SimpleStreamOptions): AssistantMessageEventStream => { + const runtimeSessionId = options?.sessionId?.trim(); + return delegate(toOpenAICompletionsModel(model), context, { + ...options, + apiKey: options?.apiKey ?? config.apiKey, + headers: buildDynamoHeaders(options?.headers, config, runtimeSessionId, createRequestId), + }); + }; +} + +export function createDynamoProviderConfig(config: DynamoConfig, models: ProviderModelConfig[]): ProviderConfig { + return { + name: "Dynamo", + baseUrl: config.baseUrl, + apiKey: config.apiKey, + api: DYNAMO_API, + models, + streamSimple: createDynamoStreamSimple(config), + }; +} diff --git a/src/tool-relay.ts b/pi-plugin/src/light/tool-relay.ts similarity index 58% rename from src/tool-relay.ts rename to pi-plugin/src/light/tool-relay.ts index b2b2a40..5a0d2be 100644 --- a/src/tool-relay.ts +++ b/pi-plugin/src/light/tool-relay.ts @@ -5,7 +5,8 @@ import { Buffer } from "node:buffer"; import { encode } from "@msgpack/msgpack"; import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent"; import { Push } from "zeromq"; -import type { DynamoEnvironment, DynamoProviderRuntimeConfig } from "./dynamo-provider.js"; +import type { DynamoConfig, DynamoEnvironment } from "./provider.js"; +import { envValue } from "./trajectory.js"; export const DEFAULT_TOOL_EVENTS_TOPIC = "agent-tool-events"; export const DEFAULT_TOOL_EVENT_QUEUE_CAPACITY = 100000; @@ -23,33 +24,29 @@ export interface DynamoToolRelayConfig { } export interface DynamoRequestTraceAgentContext { - session_type_id: string; - session_id: string; trajectory_id: string; parent_trajectory_id?: string; } -export type DynamoToolStatus = "running" | "succeeded" | "error" | "cancelled"; -export type DynamoToolTraceEventType = "tool_start" | "tool_end" | "tool_error"; - -export interface DynamoRequestTraceToolEvent { - tool_call_id: string; - tool_class: string; - started_at_unix_ms?: number; - ended_at_unix_ms?: number; - status?: DynamoToolStatus; - duration_ms?: number; - output_bytes?: number; - error_type?: string; -} +type ToolTraceEventType = "tool_start" | "tool_end" | "tool_error"; +type ToolStatus = "running" | "succeeded" | "error"; export interface DynamoRequestTraceRecord { schema: "dynamo.request.trace.v1"; - event_type: DynamoToolTraceEventType; + event_type: ToolTraceEventType; event_time_unix_ms: number; event_source: "harness"; agent_context: DynamoRequestTraceAgentContext; - tool: DynamoRequestTraceToolEvent; + tool: { + tool_call_id: string; + tool_class: string; + started_at_unix_ms?: number; + ended_at_unix_ms?: number; + status?: ToolStatus; + duration_ms?: number; + output_bytes?: number; + error_type?: string; + }; } export interface ToolEventSocket { @@ -60,66 +57,34 @@ export interface ToolEventSocket { export type ToolEventSocketFactory = () => ToolEventSocket; -export interface PiToolExecutionStartEvent { - toolCallId: string; - toolName: string; - args: unknown; -} - -export interface PiToolExecutionEndEvent { - toolCallId: string; - toolName: string; - result: unknown; - isError: boolean; -} - -interface ToolCallStart { +interface ToolStart { agentContext: DynamoRequestTraceAgentContext; - toolName: string; toolClass: string; startedAtUnixMs: number; startedAtPerfMs: number; } -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - -function getEnvValue(env: DynamoToolRelayEnvironment, key: keyof DynamoToolRelayEnvironment): string | undefined { - const value = env[key]; - const trimmed = value?.trim(); - return trimmed ? trimmed : undefined; -} - function parsePositiveInteger(value: string | undefined, fallback: number): number { - if (!value) return fallback; - const parsed = Number.parseInt(value, 10); + const parsed = value ? Number.parseInt(value, 10) : Number.NaN; return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; } export function readDynamoToolRelayConfig(env: DynamoToolRelayEnvironment = process.env): DynamoToolRelayConfig { - const endpoint = getEnvValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT"); - + const endpoint = envValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT"); return { ...(endpoint ? { endpoint } : {}), - topic: getEnvValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC") ?? DEFAULT_TOOL_EVENTS_TOPIC, + topic: envValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC") ?? DEFAULT_TOOL_EVENTS_TOPIC, queueCapacity: parsePositiveInteger( - getEnvValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_QUEUE_CAPACITY"), + envValue(env, "DYN_REQUEST_TRACE_TOOL_EVENTS_QUEUE_CAPACITY"), DEFAULT_TOOL_EVENT_QUEUE_CAPACITY, ), }; } -export function buildDynamoRequestTraceAgentContext( - config: DynamoProviderRuntimeConfig, - sessionId: string | undefined, -): DynamoRequestTraceAgentContext | undefined { +export function buildToolAgentContext(config: DynamoConfig, sessionId: string | undefined) { const trajectoryId = config.trajectoryId ?? sessionId; if (!trajectoryId) return undefined; - return { - session_type_id: config.sessionTypeId, - session_id: config.sessionId ?? trajectoryId, trajectory_id: trajectoryId, ...(config.parentTrajectoryId ? { parent_trajectory_id: config.parentTrajectoryId } : {}), }; @@ -127,27 +92,24 @@ export function buildDynamoRequestTraceAgentContext( export function getToolClass(toolName: string | undefined): string { const name = toolName?.trim(); - if (!name) return "unknown"; - return name.split("---", 1)[0]?.split("/", 1)[0] || "unknown"; + return name ? name.split("---", 1)[0]?.split("/", 1)[0] || "unknown" : "unknown"; } -export function getToolResultOutputBytes(result: unknown): number | undefined { - if (!isRecord(result) || !Array.isArray(result.content)) { +function outputBytes(result: unknown): number | undefined { + if (typeof result !== "object" || result === null || !("content" in result) || !Array.isArray(result.content)) { return undefined; } - - const output = result.content - .map((item) => { - if (isRecord(item) && typeof item.text === "string") { - return item.text; - } - return JSON.stringify(item); - }) + const text = result.content + .map((item: unknown) => + typeof item === "object" && item !== null && "text" in item && typeof item.text === "string" + ? item.text + : JSON.stringify(item), + ) .join("\n"); - return Buffer.byteLength(output, "utf8"); + return Buffer.byteLength(text, "utf8"); } -function createSequenceFrame(sequence: bigint): Buffer { +function sequenceFrame(sequence: bigint): Buffer { const frame = Buffer.alloc(8); frame.writeBigUInt64BE(sequence); return frame; @@ -179,22 +141,18 @@ export class DynamoToolEventPublisher { } async start(): Promise { - if (!this.config.endpoint) return; - await this.socket.connect(this.config.endpoint); + if (this.config.endpoint) await this.socket.connect(this.config.endpoint); } publish(record: DynamoRequestTraceRecord): boolean { - if (this.closed || !this.config.endpoint) return false; - if (this.queued >= this.config.queueCapacity) return false; - + if (this.closed || !this.config.endpoint || this.queued >= this.config.queueCapacity) return false; const frames: [Buffer, Buffer, Buffer] = [ this.topicFrame, - createSequenceFrame(this.sequence), + sequenceFrame(this.sequence), Buffer.from(encode(record)), ]; this.sequence += 1n; this.queued += 1; - this.sendChain = this.sendChain .catch(() => undefined) .then(() => this.socket.send(frames)) @@ -217,29 +175,29 @@ export class DynamoToolEventPublisher { } export class DynamoToolEventRelay { - private readonly starts = new Map(); + private readonly starts = new Map(); constructor( - private readonly config: DynamoProviderRuntimeConfig, + private readonly config: DynamoConfig, private readonly publisher: DynamoToolEventPublisher, private readonly nowUnixMs: () => number = () => Date.now(), private readonly nowPerfMs: () => number = () => performance.now(), ) {} - handleToolExecutionStart(event: PiToolExecutionStartEvent, ctx: ExtensionContext): void { - const agentContext = buildDynamoRequestTraceAgentContext(this.config, ctx.sessionManager.getSessionId()); + handleToolExecutionStart( + event: { toolCallId: string; toolName: string; args: unknown }, + ctx: ExtensionContext, + ): void { + const agentContext = buildToolAgentContext(this.config, ctx.sessionManager.getSessionId()); if (!agentContext) return; - const startedAtUnixMs = this.nowUnixMs(); const toolClass = getToolClass(event.toolName); this.starts.set(event.toolCallId, { agentContext, - toolName: event.toolName, toolClass, startedAtUnixMs, startedAtPerfMs: this.nowPerfMs(), }); - this.publisher.publish({ schema: "dynamo.request.trace.v1", event_type: "tool_start", @@ -255,23 +213,18 @@ export class DynamoToolEventRelay { }); } - handleToolExecutionEnd(event: PiToolExecutionEndEvent, ctx: ExtensionContext): void { + handleToolExecutionEnd( + event: { toolCallId: string; toolName: string; result: unknown; isError: boolean }, + ctx: ExtensionContext, + ): void { const endedAtUnixMs = this.nowUnixMs(); const endedAtPerfMs = this.nowPerfMs(); const start = this.starts.get(event.toolCallId); this.starts.delete(event.toolCallId); - - const agentContext = - start?.agentContext ?? buildDynamoRequestTraceAgentContext(this.config, ctx.sessionManager.getSessionId()); + const agentContext = start?.agentContext ?? buildToolAgentContext(this.config, ctx.sessionManager.getSessionId()); if (!agentContext) return; - const startedAtUnixMs = start?.startedAtUnixMs ?? endedAtUnixMs; - const durationMs = - start === undefined ? 0 : Math.max(0, Math.round((endedAtPerfMs - start.startedAtPerfMs) * 1000) / 1000); - const status: DynamoToolStatus = event.isError ? "error" : "succeeded"; - const toolClass = start?.toolClass ?? getToolClass(event.toolName); - const outputBytes = getToolResultOutputBytes(event.result); - + const bytes = outputBytes(event.result); this.publisher.publish({ schema: "dynamo.request.trace.v1", event_type: event.isError ? "tool_error" : "tool_end", @@ -280,13 +233,13 @@ export class DynamoToolEventRelay { agent_context: agentContext, tool: { tool_call_id: event.toolCallId, - tool_class: toolClass, + tool_class: start?.toolClass ?? getToolClass(event.toolName), started_at_unix_ms: startedAtUnixMs, ended_at_unix_ms: endedAtUnixMs, - duration_ms: durationMs, - status, + duration_ms: start ? Math.max(0, Math.round((endedAtPerfMs - start.startedAtPerfMs) * 1000) / 1000) : 0, + status: event.isError ? "error" : "succeeded", ...(event.isError ? { error_type: "pi_tool_error" } : {}), - ...(outputBytes === undefined ? {} : { output_bytes: outputBytes }), + ...(bytes === undefined ? {} : { output_bytes: bytes }), }, }); } @@ -294,25 +247,16 @@ export class DynamoToolEventRelay { export async function registerDynamoToolEventRelay( pi: ExtensionAPI, - config: DynamoProviderRuntimeConfig, + config: DynamoConfig, relayConfig: DynamoToolRelayConfig = readDynamoToolRelayConfig(), socketFactory: ToolEventSocketFactory = createZeroMqPushSocket, ): Promise { - if (!relayConfig.endpoint) return undefined; - + if (!config.traceEnabled || !relayConfig.endpoint) return undefined; const publisher = new DynamoToolEventPublisher(relayConfig, socketFactory); await publisher.start(); const relay = new DynamoToolEventRelay(config, publisher); - - pi.on("tool_execution_start", (event, ctx) => { - relay.handleToolExecutionStart(event, ctx); - }); - pi.on("tool_execution_end", (event, ctx) => { - relay.handleToolExecutionEnd(event, ctx); - }); - pi.on("session_shutdown", () => { - publisher.close(); - }); - + pi.on("tool_execution_start", (event, ctx) => relay.handleToolExecutionStart(event, ctx)); + pi.on("tool_execution_end", (event, ctx) => relay.handleToolExecutionEnd(event, ctx)); + pi.on("session_shutdown", () => publisher.close()); return relay; } diff --git a/pi-plugin/src/light/trajectory.ts b/pi-plugin/src/light/trajectory.ts new file mode 100644 index 0000000..c8bc819 --- /dev/null +++ b/pi-plugin/src/light/trajectory.ts @@ -0,0 +1,68 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +export interface DynamoTrajectoryEnvironment { + DYN_REQUEST_TRACE?: string; + DYN_AGENT_TRAJECTORY_ID?: string; + DYN_AGENT_PARENT_TRAJECTORY_ID?: string; + PI_SUBAGENT_CHILD?: string; + PI_SUBAGENT_RUN_ID?: string; + PI_SUBAGENT_CHILD_AGENT?: string; + PI_SUBAGENT_CHILD_INDEX?: string; +} + +export interface DynamoTrajectoryContext { + trajectoryId?: string; + parentTrajectoryId?: string; +} + +export function envValue(env: T, key: K): string | undefined { + const value = env[key]; + const trimmed = typeof value === "string" ? value.trim() : undefined; + return trimmed ? trimmed : undefined; +} + +export function isTruthyEnv(value: string | undefined): boolean { + return value ? ["1", "true", "yes", "on"].includes(value.toLowerCase()) : false; +} + +export function subagentTrajectoryId(env: DynamoTrajectoryEnvironment): string | undefined { + if (envValue(env, "PI_SUBAGENT_CHILD") !== "1") return undefined; + const runId = envValue(env, "PI_SUBAGENT_RUN_ID"); + const childAgent = envValue(env, "PI_SUBAGENT_CHILD_AGENT"); + if (!runId || !childAgent) return undefined; + return `${runId}:${childAgent}:${envValue(env, "PI_SUBAGENT_CHILD_INDEX") ?? "0"}`; +} + +export function resolveTrajectoryContext(env: DynamoTrajectoryEnvironment): DynamoTrajectoryContext { + const childTrajectoryId = subagentTrajectoryId(env); + if (childTrajectoryId) { + const parentTrajectoryId = + envValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID") ?? envValue(env, "DYN_AGENT_TRAJECTORY_ID"); + return { + trajectoryId: childTrajectoryId, + ...(parentTrajectoryId ? { parentTrajectoryId } : {}), + }; + } + + const trajectoryId = envValue(env, "DYN_AGENT_TRAJECTORY_ID"); + const parentTrajectoryId = envValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID"); + return { + ...(trajectoryId ? { trajectoryId } : {}), + ...(parentTrajectoryId ? { parentTrajectoryId } : {}), + }; +} + +export function applySubagentTrajectoryBridge(env: NodeJS.ProcessEnv = process.env): boolean { + const context = resolveTrajectoryContext(env); + if (!subagentTrajectoryId(env) || !context.trajectoryId) return false; + if ( + envValue(env, "DYN_AGENT_TRAJECTORY_ID") === context.trajectoryId && + envValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID") === context.parentTrajectoryId + ) { + return false; + } + if (context.parentTrajectoryId) env.DYN_AGENT_PARENT_TRAJECTORY_ID = context.parentTrajectoryId; + env.DYN_AGENT_TRAJECTORY_ID = context.trajectoryId; + return true; +} diff --git a/test/integration/smoke.mjs b/pi-plugin/test/integration/smoke.mjs similarity index 70% rename from test/integration/smoke.mjs rename to pi-plugin/test/integration/smoke.mjs index 15130d1..9353e8d 100644 --- a/test/integration/smoke.mjs +++ b/pi-plugin/test/integration/smoke.mjs @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 // Integration smoke test: spins up a Dynamo frontend + mocker, sends one chat -// completion through pi-dynamo-provider's streamSimple wrapper, and asserts -// that nvext.agent_context fields round-trip into the JSONL request trace. +// completion through Dynamo and asserts that x-dynamo-trajectory-id becomes +// trajectory identity in the JSONL request trace. // // Not a unit test — runs out-of-band of vitest. Driven by // scripts/integration-smoke.sh which boots Dynamo, exports the trace sink env @@ -11,8 +11,8 @@ // transport failure. // // Assertions, in order: -// 1. agent_context fields we set as env vars appear verbatim in the trace -// 2. subagent bridge rewrites trajectory_id / parent_trajectory_id when +// 1. x-dynamo-trajectory-id becomes Dynamo agent_context trajectory_id +// 2. subagent bridge derives a child trajectory id when // PI_SUBAGENT_CHILD=1 + bookkeeping vars are exported // // Mocker output text is intentionally garbage; we never assert on response @@ -21,12 +21,7 @@ import { readFileSync, existsSync } from "node:fs"; import { setTimeout as delay } from "node:timers/promises"; -import { - buildDynamoAgentContext, - createDynamoStreamSimple, - DYNAMO_API, - readDynamoConfig, -} from "../../dist/dynamo-provider.js"; +import { readDynamoConfig } from "../../dist/light/provider.js"; const TRACE_PATH = mustEnv("DYN_REQUEST_TRACE_OUTPUT_PATH"); const BASE_URL = mustEnv("DYNAMO_BASE_URL"); @@ -69,18 +64,19 @@ async function waitForTraceMatching(predicate, label, timeoutMs = 15000) { throw new Error(`smoke: timed out waiting for trace event: ${label}`); } -async function postChat(agentContext, xRequestId) { +async function postChat({ trajectoryId, parentTrajectoryId, xRequestId }) { const body = { model: MODEL_ID, messages: [{ role: "user", content: "smoke" }], max_tokens: 4, stream: false, - nvext: { agent_context: agentContext }, }; const response = await fetch(`${BASE_URL}/chat/completions`, { method: "POST", headers: { "content-type": "application/json", + "x-dynamo-trajectory-id": trajectoryId, + ...(parentTrajectoryId ? { "x-dynamo-parent-trajectory-id": parentTrajectoryId } : {}), "x-request-id": xRequestId, authorization: `Bearer ${process.env.DYNAMO_API_KEY ?? "dynamo-local"}`, }, @@ -99,15 +95,10 @@ function assert(condition, message) { if (!condition) throw new Error(`smoke: assertion failed: ${message}`); } -async function caseTopLevelAgentContext() { +async function caseTopLevelSessionHeader() { const xRequestId = "smoke-toplevel-" + Date.now(); - const agentContext = { - session_type_id: "ci_smoke", - session_id: "smoke-session-toplevel", - trajectory_id: "smoke-traj-toplevel", - phase: "reasoning", - }; - await postChat(agentContext, xRequestId); + const trajectoryId = "smoke-session-toplevel"; + await postChat({ trajectoryId, xRequestId }); const event = await waitForTraceMatching( (e) => e.event_type === "request_end" && e.request?.x_request_id === xRequestId, @@ -116,15 +107,7 @@ async function caseTopLevelAgentContext() { assert(event.agent_context, "trace event missing agent_context"); assert( - event.agent_context.session_type_id === agentContext.session_type_id, - `session_type_id mismatch: got ${event.agent_context.session_type_id}`, - ); - assert( - event.agent_context.session_id === agentContext.session_id, - `session_id mismatch: got ${event.agent_context.session_id}`, - ); - assert( - event.agent_context.trajectory_id === agentContext.trajectory_id, + event.agent_context.trajectory_id === trajectoryId, `trajectory_id mismatch: got ${event.agent_context.trajectory_id}`, ); assert( @@ -132,18 +115,16 @@ async function caseTopLevelAgentContext() { event.agent_context.parent_trajectory_id === null, `parent_trajectory_id should be unset for top-level case`, ); - console.log(" PASS top-level agent_context round-trip"); + console.log(" PASS top-level trajectory_id from x-dynamo-trajectory-id"); } async function caseSubagentBridge() { // Simulate the env shape pi-subagents would set on a spawned child: // inherited DYN_AGENT_TRAJECTORY_ID (parent's id) plus PI_SUBAGENT_* bookkeeping. - // readDynamoConfig should rewrite both ids, and the rewritten values must - // land in the trace when streamSimple dispatches. + // readDynamoConfig should rewrite both ids before the request is sent. const env = { DYNAMO_BASE_URL: BASE_URL, - DYN_AGENT_SESSION_TYPE_ID: "ci_smoke", - DYN_AGENT_SESSION_ID: "smoke-session-subagent", + DYN_REQUEST_TRACE: "1", DYN_AGENT_TRAJECTORY_ID: "smoke-orchestrator", PI_SUBAGENT_CHILD: "1", PI_SUBAGENT_RUN_ID: "smoke-run", @@ -161,8 +142,11 @@ async function caseSubagentBridge() { ); const xRequestId = "smoke-subagent-" + Date.now(); - const agentContext = buildDynamoAgentContext(config); - await postChat(agentContext, xRequestId); + await postChat({ + trajectoryId: config.trajectoryId, + parentTrajectoryId: config.parentTrajectoryId, + xRequestId, + }); const event = await waitForTraceMatching( (e) => e.event_type === "request_end" && e.request?.x_request_id === xRequestId, @@ -174,25 +158,15 @@ async function caseSubagentBridge() { event.agent_context.trajectory_id === "smoke-run:researcher:0", `subagent trajectory_id mismatch: got ${event.agent_context.trajectory_id}`, ); - assert( - event.agent_context.parent_trajectory_id === "smoke-orchestrator", - `subagent parent_trajectory_id mismatch: got ${event.agent_context.parent_trajectory_id}`, - ); - console.log(" PASS pi-subagents trajectory bridge round-trip"); + console.log(" PASS pi-subagents trajectory_id header"); } async function main() { - // Exercise the wrapper indirectly: streamSimple's injection path is unit - // tested elsewhere. Here we POST the same nvext shape it would produce so - // we're checking dynamo's receive side, not pi-ai's stream loop. - void createDynamoStreamSimple; - void DYNAMO_API; - console.log(`smoke: trace path = ${TRACE_PATH}`); console.log(`smoke: dynamo base = ${BASE_URL}`); console.log(`smoke: model = ${MODEL_ID}`); - await caseTopLevelAgentContext(); + await caseTopLevelSessionHeader(); await caseSubagentBridge(); console.log("smoke: all assertions passed"); diff --git a/pi-plugin/test/light.test.ts b/pi-plugin/test/light.test.ts new file mode 100644 index 0000000..668cfc4 --- /dev/null +++ b/pi-plugin/test/light.test.ts @@ -0,0 +1,121 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { decode } from "@msgpack/msgpack"; +import { createAssistantMessageEventStream, type Context, type Model, type SimpleStreamOptions } from "@mariozechner/pi-ai"; +import type { ExtensionContext } from "@mariozechner/pi-coding-agent"; +import { describe, expect, it } from "vitest"; +import { + applySubagentTrajectoryBridge, + buildToolAgentContext, + createDynamoStreamSimple, + DEFAULT_DYNAMO_BASE_URL, + DEFAULT_DYNAMO_MODEL_ID, + DynamoToolEventPublisher, + DynamoToolEventRelay, + DYNAMO_API, + readDynamoConfig, + type DynamoConfig, + type DynamoRequestTraceRecord, + type ToolEventSocket, +} from "../src/index.js"; + +const model = { + id: DEFAULT_DYNAMO_MODEL_ID, + name: "Default", + api: DYNAMO_API, + provider: "dynamo", + baseUrl: DEFAULT_DYNAMO_BASE_URL, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128000, + maxTokens: 8192, +} satisfies Model; + +const context: Context = { messages: [] }; +const config: DynamoConfig = { + baseUrl: DEFAULT_DYNAMO_BASE_URL, + apiKey: "test-key", + traceEnabled: true, +}; + +class FakeToolEventSocket implements ToolEventSocket { + readonly sent: [Buffer, Buffer, Buffer][] = []; + async connect(_endpoint: string): Promise {} + async send(frames: [Buffer, Buffer, Buffer]): Promise { + this.sent.push(frames); + } + close(): void {} +} + +function createContext(sessionId: string): ExtensionContext { + return { sessionManager: { getSessionId: () => sessionId } } as unknown as ExtensionContext; +} + +describe("light provider", () => { + it("keeps Pi sessionId and adds Dynamo trajectory headers", () => { + let capturedOptions: SimpleStreamOptions | undefined; + const streamSimple = createDynamoStreamSimple( + config, + (_model, _context, options) => { + capturedOptions = options; + return createAssistantMessageEventStream(); + }, + () => "request-1", + ); + + streamSimple(model, context, { sessionId: "pi-session" }); + + expect(capturedOptions?.sessionId).toBe("pi-session"); + expect(capturedOptions?.headers).toEqual({ + "x-request-id": "request-1", + "x-dynamo-trajectory-id": "pi-session", + }); + }); + + it("bridges pi-subagents through Dynamo trajectory headers", () => { + const env: NodeJS.ProcessEnv = { + DYN_REQUEST_TRACE: "1", + DYN_AGENT_TRAJECTORY_ID: "parent", + PI_SUBAGENT_CHILD: "1", + PI_SUBAGENT_RUN_ID: "run", + PI_SUBAGENT_CHILD_AGENT: "researcher", + }; + expect(applySubagentTrajectoryBridge(env)).toBe(true); + const cfg = readDynamoConfig(env); + + let capturedOptions: SimpleStreamOptions | undefined; + createDynamoStreamSimple( + cfg, + (_model, _context, options) => { + capturedOptions = options; + return createAssistantMessageEventStream(); + }, + () => "request-1", + )(model, context, { sessionId: "pi-session" }); + + expect(capturedOptions?.sessionId).toBe("pi-session"); + expect(capturedOptions?.headers).toMatchObject({ + "x-dynamo-trajectory-id": "run:researcher:0", + "x-dynamo-parent-trajectory-id": "parent", + }); + }); + + it("emits trajectory-only ZMQ tool context", async () => { + const socket = new FakeToolEventSocket(); + const publisher = new DynamoToolEventPublisher( + { endpoint: "tcp://127.0.0.1:20390", topic: "tools", queueCapacity: 10 }, + () => socket, + ); + await publisher.start(); + const relay = new DynamoToolEventRelay(config, publisher, () => 1000, () => 10); + + relay.handleToolExecutionStart({ toolCallId: "call-1", toolName: "bash", args: {} }, createContext("pi-session")); + await publisher.flush(); + + const record = decode(socket.sent[0]?.[2] ?? Buffer.alloc(0)) as DynamoRequestTraceRecord; + expect(buildToolAgentContext(config, "pi-session")).toEqual({ trajectory_id: "pi-session" }); + expect(record.agent_context).toEqual({ trajectory_id: "pi-session" }); + }); +}); diff --git a/tsconfig.build.json b/pi-plugin/tsconfig.build.json similarity index 100% rename from tsconfig.build.json rename to pi-plugin/tsconfig.build.json diff --git a/tsconfig.json b/pi-plugin/tsconfig.json similarity index 100% rename from tsconfig.json rename to pi-plugin/tsconfig.json diff --git a/src/dynamo-provider.ts b/src/dynamo-provider.ts deleted file mode 100644 index 2aab234..0000000 --- a/src/dynamo-provider.ts +++ /dev/null @@ -1,422 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -import { randomUUID } from "node:crypto"; -import { streamSimpleOpenAICompletions } from "@mariozechner/pi-ai"; -import type { - Api, - AssistantMessageEventStream, - Context, - Model, - OpenAICompletionsCompat, - SimpleStreamOptions, -} from "@mariozechner/pi-ai"; -import type { ProviderConfig, ProviderModelConfig } from "@mariozechner/pi-coding-agent"; - -export const DYNAMO_PROVIDER_ID = "dynamo"; -export const DYNAMO_API = "dynamo-openai-completions" satisfies Api; -export const DEFAULT_DYNAMO_BASE_URL = "http://127.0.0.1:8000/v1"; -export const DEFAULT_DYNAMO_API_KEY = "dynamo-local"; -export const DEFAULT_SESSION_TYPE_ID = "pi_coding_agent"; -export const DEFAULT_DYNAMO_MODEL_ID = "default"; - -export interface DynamoEnvironment { - DYNAMO_BASE_URL?: string; - OPENAI_BASE_URL?: string; - DYNAMO_API_KEY?: string; - // Master switch for the provider's request-trace emissions. When truthy the - // provider injects nvext.agent_context and (if an endpoint is set) the - // tool-event relay — all with sensible defaults that the more specific - // DYN_AGENT_* / DYNAMO_* vars below override. When unset/falsy the provider - // is just a plain `dynamo/` provider. - DYN_REQUEST_TRACE?: string; - DYN_AGENT_SESSION_TYPE_ID?: string; - DYN_AGENT_SESSION_ID?: string; - DYN_AGENT_TRAJECTORY_ID?: string; - DYN_AGENT_PARENT_TRAJECTORY_ID?: string; - // pi-subagents bookkeeping. pi-subagents spawns each child agent as a - // node child_process with `{ ...process.env, ...subagentEnv }`, so the - // parent's DYN_AGENT_TRAJECTORY_ID arrives in the child unchanged — - // under the wrong name. The bridge below reinterprets it. See - // `applySubagentBridge` and the README "Subagent trajectory linking". - PI_SUBAGENT_CHILD?: string; - PI_SUBAGENT_RUN_ID?: string; - PI_SUBAGENT_CHILD_AGENT?: string; - PI_SUBAGENT_CHILD_INDEX?: string; -} - -export interface DynamoProviderRuntimeConfig { - baseUrl: string; - apiKey: string; - // DYN_REQUEST_TRACE master switch. Gates agent_context and - // the tool relay; the model provider itself is registered regardless. - traceEnabled: boolean; - sessionTypeId: string; - sessionId?: string; - trajectoryId?: string; - parentTrajectoryId?: string; - isSubagent?: boolean; -} - -export interface DynamoAgentContext { - trajectory_id?: string; - parent_trajectory_id?: string; - session_id?: string; - session_type_id: string; - phase: "reasoning"; - // Terminal marker: the thunderagent_router releases the program when set. - trajectory_final?: boolean; -} - -interface OpenAIModelsResponse { - data?: Array<{ - id?: unknown; - }>; -} - -type OpenAICompletionsStreamSimple = ( - model: Model<"openai-completions">, - context: Context, - options?: SimpleStreamOptions, -) => AssistantMessageEventStream; - -type ProviderStreamSimple = NonNullable; - -function getEnvValue(env: DynamoEnvironment, key: keyof DynamoEnvironment): string | undefined { - const value = env[key]; - const trimmed = value?.trim(); - return trimmed ? trimmed : undefined; -} - -export function normalizeDynamoBaseUrl(rawBaseUrl: string | undefined): string { - const raw = rawBaseUrl?.trim() || DEFAULT_DYNAMO_BASE_URL; - const withoutTrailingSlash = raw.replace(/\/+$/, ""); - - try { - const url = new URL(withoutTrailingSlash); - if (url.pathname === "" || url.pathname === "/") { - url.pathname = "/v1"; - } - return url.toString().replace(/\/+$/, ""); - } catch { - return withoutTrailingSlash; - } -} - -function isTruthyEnv(value: string | undefined): boolean { - if (!value) return false; - return ["1", "true", "yes", "on"].includes(value.toLowerCase()); -} - -/** - * The subagent's stable trajectory id, derived purely from the pi-subagents - * `PI_SUBAGENT_*` bookkeeping. Returns `undefined` outside a pi-subagents child - * or when the identity is incomplete (no run id / agent name). - * - * `PI_SUBAGENT_CHILD_INDEX` defaults to `"0"` when absent. - */ -export function computeSubagentTrajectoryId(env: DynamoEnvironment): string | undefined { - if (getEnvValue(env, "PI_SUBAGENT_CHILD") !== "1") return undefined; - const runId = getEnvValue(env, "PI_SUBAGENT_RUN_ID"); - const childAgent = getEnvValue(env, "PI_SUBAGENT_CHILD_AGENT"); - if (!runId || !childAgent) return undefined; - const childIndex = getEnvValue(env, "PI_SUBAGENT_CHILD_INDEX") ?? "0"; - return `${runId}:${childAgent}:${childIndex}`; -} - -/** - * Compute the trajectory rewrite that pi-subagents inheritance implies, without - * mutating any caller-visible state. Pure: takes the raw env, returns either - * `null` (not a pi-subagents child) or the child's trajectory id plus optional - * parent id. - * - * `PI_SUBAGENT_CHILD === "1"` switches the current process identity from root - * to child: `trajectory_id` becomes the child id, and the inherited - * `DYN_AGENT_TRAJECTORY_ID` becomes `parent_trajectory_id` unless an explicit - * parent override is already present. - */ -export function computeSubagentTrajectoryRewrite( - env: DynamoEnvironment, -): { trajectoryId: string; parentTrajectoryId?: string } | null { - const trajectoryId = computeSubagentTrajectoryId(env); - if (!trajectoryId) return null; - const parentTrajectoryId = - getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID") ?? getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID"); - return { - trajectoryId, - ...(parentTrajectoryId ? { parentTrajectoryId } : {}), - }; -} - -/** - * Apply the pi-subagents trajectory rewrite to `process.env` so subsequent - * pi-subagents spawns inherit this generation's synthesized trajectory_id as - * their parent. Without this, nested subagent chains collapse — every - * generation would observe the original grandparent as its parent and the - * middle generations would be invisible in the dynamo trace. - * - * Idempotent: a second call has no effect once the env already contains the - * computed child trajectory and parent link. Safe to invoke from extension init. - * - * Mutates the supplied env object in place (defaults to `process.env`); also - * returns whether a rewrite was applied so callers can log/test. - */ -export function applySubagentBridge(env: NodeJS.ProcessEnv = process.env): boolean { - const rewrite = computeSubagentTrajectoryRewrite(env); - if (!rewrite) return false; - if ( - getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID") === rewrite.trajectoryId && - getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID") === rewrite.parentTrajectoryId - ) { - return false; - } - if (rewrite.parentTrajectoryId) env.DYN_AGENT_PARENT_TRAJECTORY_ID = rewrite.parentTrajectoryId; - env.DYN_AGENT_TRAJECTORY_ID = rewrite.trajectoryId; - return true; -} - -/** - * Seed a root trajectory id so spawned pi-subagents have a parent to inherit. - * `applySubagentBridge` only fires when a child inherits a non-empty - * `DYN_AGENT_TRAJECTORY_ID`; if the root never sets one, the first generation of - * subagents inherits nothing, the bridge no-ops, and the whole chain stays flat - * (no `parent_trajectory_id`). Only the ROOT seeds — a pi-subagents child already - * inherits its parent's id, and a caller-set id wins. Uses `DYN_AGENT_SESSION_ID` - * when present (root trajectory == its session) else a fresh id. Gated on - * `DYN_REQUEST_TRACE`. Mutates env in place; must run before any subagent spawn. - * Returns whether a seed was written. - */ -export function seedRootTrajectory( - env: NodeJS.ProcessEnv = process.env, - mkId: () => string = randomUUID, -): boolean { - if (!isTruthyEnv(getEnvValue(env, "DYN_REQUEST_TRACE"))) return false; - if (getEnvValue(env, "PI_SUBAGENT_CHILD") === "1") return false; - if (getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID")) return false; - env.DYN_AGENT_TRAJECTORY_ID = getEnvValue(env, "DYN_AGENT_SESSION_ID") ?? mkId(); - return true; -} - -export function readDynamoConfig(env: DynamoEnvironment = process.env): DynamoProviderRuntimeConfig { - const rewrite = computeSubagentTrajectoryRewrite(env); - const sessionId = getEnvValue(env, "DYN_AGENT_SESSION_ID"); - const trajectoryId = rewrite?.trajectoryId ?? getEnvValue(env, "DYN_AGENT_TRAJECTORY_ID"); - const parentTrajectoryId = - rewrite?.parentTrajectoryId ?? getEnvValue(env, "DYN_AGENT_PARENT_TRAJECTORY_ID"); - - return { - baseUrl: normalizeDynamoBaseUrl(getEnvValue(env, "DYNAMO_BASE_URL") ?? getEnvValue(env, "OPENAI_BASE_URL")), - apiKey: getEnvValue(env, "DYNAMO_API_KEY") ?? DEFAULT_DYNAMO_API_KEY, - traceEnabled: isTruthyEnv(getEnvValue(env, "DYN_REQUEST_TRACE")), - sessionTypeId: getEnvValue(env, "DYN_AGENT_SESSION_TYPE_ID") ?? DEFAULT_SESSION_TYPE_ID, - ...(sessionId ? { sessionId } : {}), - ...(trajectoryId ? { trajectoryId } : {}), - ...(parentTrajectoryId ? { parentTrajectoryId } : {}), - isSubagent: rewrite !== null, - }; -} - -export function buildDynamoAgentContext( - config: DynamoProviderRuntimeConfig, - options?: Pick, -): DynamoAgentContext { - // session_id and trajectory_id both default to Pi's own session id when not - // pinned via DYN_AGENT_*. Dynamo's AgentContext requires session_id, so a - // default keeps the payload valid with zero operator env beyond DYN_REQUEST_TRACE. - const trajectoryId = config.trajectoryId ?? options?.sessionId; - const sessionId = config.sessionId ?? options?.sessionId; - return { - ...(trajectoryId ? { trajectory_id: trajectoryId } : {}), - ...(config.parentTrajectoryId ? { parent_trajectory_id: config.parentTrajectoryId } : {}), - ...(sessionId ? { session_id: sessionId } : {}), - session_type_id: config.sessionTypeId, - phase: "reasoning", - }; -} - -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - -export function mergeDynamoAgentContext(payload: unknown, agentContext: DynamoAgentContext): unknown { - const payloadRecord = isRecord(payload) ? payload : {}; - const existingNvext = isRecord(payloadRecord.nvext) ? payloadRecord.nvext : {}; - const existingAgentContext = isRecord(existingNvext.agent_context) ? existingNvext.agent_context : {}; - - return { - ...payloadRecord, - nvext: { - ...existingNvext, - agent_context: { - ...agentContext, - ...existingAgentContext, - }, - }, - }; -} - -function hasHeader(headers: Record, target: string): boolean { - const normalizedTarget = target.toLowerCase(); - return Object.keys(headers).some((key) => key.toLowerCase() === normalizedTarget); -} - -export function buildDynamoHeaders( - headers: Record | undefined, - createRequestId: () => string = randomUUID, -): Record { - const nextHeaders = { ...headers }; - if (!hasHeader(nextHeaders, "x-request-id")) { - nextHeaders["x-request-id"] = createRequestId(); - } - return nextHeaders; -} - -const dynamoOpenAICompat = { - supportsStore: false, - supportsDeveloperRole: false, - supportsReasoningEffort: false, - supportsUsageInStreaming: true, - maxTokensField: "max_tokens", - supportsStrictMode: false, - supportsLongCacheRetention: false, -} satisfies OpenAICompletionsCompat; - -export function createDynamoModels(modelIds: string[], baseUrl: string): ProviderModelConfig[] { - const ids = modelIds.length > 0 ? modelIds : [DEFAULT_DYNAMO_MODEL_ID]; - return ids.map((id) => ({ - id, - name: id, - api: DYNAMO_API, - baseUrl, - reasoning: false, - input: ["text"], - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: 128000, - maxTokens: 8192, - compat: dynamoOpenAICompat, - })); -} - -export async function discoverDynamoModels( - config: DynamoProviderRuntimeConfig, - options: { timeoutMs?: number } = {}, -): Promise { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), options.timeoutMs ?? 2000); - try { - const response = await fetch(`${config.baseUrl}/models`, { - headers: { - Authorization: `Bearer ${config.apiKey}`, - }, - signal: controller.signal, - }); - if (!response.ok) { - return []; - } - - const body = (await response.json()) as OpenAIModelsResponse; - const modelIds = - body.data - ?.map((model) => model.id) - .filter((id): id is string => typeof id === "string" && id.length > 0) ?? []; - return createDynamoModels([...new Set(modelIds)], config.baseUrl); - } catch { - return []; - } finally { - clearTimeout(timeout); - } -} - -function toOpenAICompletionsModel(model: Model): Model<"openai-completions"> { - const { api: _api, compat, ...rest } = model; - return { - ...rest, - api: "openai-completions", - compat: (compat as OpenAICompletionsCompat | undefined) ?? dynamoOpenAICompat, - }; -} - -type FetchLike = (input: string, init: RequestInit) => Promise<{ ok: boolean; status: number }>; - -export async function sendTrajectoryFinal( - config: DynamoProviderRuntimeConfig, - modelId: string, - createRequestId: () => string = randomUUID, - fetchImpl: FetchLike = fetch, -): Promise { - const agentContext = { ...buildDynamoAgentContext(config), trajectory_final: true }; - if (!agentContext.trajectory_id) return false; - const finalModelId = modelId.trim() || DEFAULT_DYNAMO_MODEL_ID; - try { - const response = await fetchImpl(`${config.baseUrl}/chat/completions`, { - method: "POST", - headers: { - "content-type": "application/json", - authorization: `Bearer ${config.apiKey}`, - "x-request-id": createRequestId(), - }, - body: JSON.stringify({ - model: finalModelId, - messages: [{ role: "user", content: "." }], - max_tokens: 1, - stream: false, - nvext: { agent_context: agentContext }, - }), - signal: AbortSignal.timeout(5000), - }); - return response.ok; - } catch { - return false; - } -} - -export function createDynamoStreamSimple( - config: DynamoProviderRuntimeConfig, - delegate: OpenAICompletionsStreamSimple = streamSimpleOpenAICompletions, - createRequestId: () => string = randomUUID, -): ProviderStreamSimple { - return (model: Model, context: Context, options?: SimpleStreamOptions): AssistantMessageEventStream => { - const runtimeSessionId = options?.sessionId?.trim(); - if (!config.sessionId && runtimeSessionId) { - config.sessionId = runtimeSessionId; - } - const openAIModel = toOpenAICompletionsModel(model); - const headers = buildDynamoHeaders(options?.headers, createRequestId); - const baseOptions: SimpleStreamOptions = { - ...options, - apiKey: options?.apiKey ?? config.apiKey, - headers, - }; - - // DYN_REQUEST_TRACE off: behave as a plain dynamo/ provider — still - // add x-request-id for correlation, but inject no agentic nvext. - if (!config.traceEnabled) { - return delegate(openAIModel, context, baseOptions); - } - - const agentContext = buildDynamoAgentContext(config, options); - const previousOnPayload = options?.onPayload; - - return delegate(openAIModel, context, { - ...baseOptions, - onPayload: async (payload) => { - const injectedPayload = mergeDynamoAgentContext(payload, agentContext); - return (await previousOnPayload?.(injectedPayload, model)) ?? injectedPayload; - }, - }); - }; -} - -export function createDynamoProviderConfig( - config: DynamoProviderRuntimeConfig, - models: ProviderModelConfig[], -): ProviderConfig { - return { - name: "Dynamo", - baseUrl: config.baseUrl, - apiKey: config.apiKey, - api: DYNAMO_API, - models, - streamSimple: createDynamoStreamSimple(config), - }; -} diff --git a/src/index.ts b/src/index.ts deleted file mode 100644 index c53814a..0000000 --- a/src/index.ts +++ /dev/null @@ -1,65 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -import { randomUUID } from "node:crypto"; -import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; -import { - applySubagentBridge, - createDynamoModels, - createDynamoProviderConfig, - DEFAULT_DYNAMO_MODEL_ID, - DYNAMO_PROVIDER_ID, - discoverDynamoModels, - readDynamoConfig, - seedRootTrajectory, - sendTrajectoryFinal, -} from "./dynamo-provider.js"; -import { registerDynamoToolEventRelay } from "./tool-relay.js"; - -export default async function dynamoProviderExtension(pi: ExtensionAPI): Promise { - // Seed a root trajectory id (root only) BEFORE anything spawns subagents, so - // the first generation of pi-subagents has a parent to inherit; without it the - // bridge no-ops and the whole chain stays flat (no parent_trajectory_id). - seedRootTrajectory(); - // Mutate process.env BEFORE readDynamoConfig so the rewrite also reaches - // any pi-subagents this process later spawns. readDynamoConfig itself - // recomputes the rewrite independently, so omitting this call still - // yields a correct config for THIS process — but nested subagent chains - // collapse to the root parent without the env mutation. - applySubagentBridge(); - const config = readDynamoConfig(); - const discoveredModels = await discoverDynamoModels(config); - const models = - discoveredModels.length > 0 ? discoveredModels : createDynamoModels([DEFAULT_DYNAMO_MODEL_ID], config.baseUrl); - const closeModelId = models.map((model) => model.id.trim()).find((id) => id.length > 0) ?? DEFAULT_DYNAMO_MODEL_ID; - const providerModels = models.map((model) => ({ ...model })); - - pi.registerProvider(DYNAMO_PROVIDER_ID, createDynamoProviderConfig(config, providerModels)); - if (config.traceEnabled) { - await registerDynamoToolEventRelay(pi, config); - } - - // trajectory_final closes the current trajectory: every subagent on agent_end, - // the root only on true quit. Other session_shutdown reasons keep the same - // trajectory alive across reload/fork/new/resume flows. - const programTrajectoryId = config.trajectoryId ?? config.sessionId; - if (config.traceEnabled && programTrajectoryId) { - let programClosed = false; - const closeProgram = async (): Promise => { - if (programClosed) return; - programClosed = true; - await sendTrajectoryFinal(config, closeModelId, randomUUID); - }; - if (config.isSubagent) { - pi.on("agent_end", closeProgram); - pi.on("session_shutdown", closeProgram); - } else { - pi.on("session_shutdown", async (event) => { - if (event.reason === "quit") await closeProgram(); - }); - } - } -} - -export * from "./dynamo-provider.js"; -export * from "./tool-relay.js"; diff --git a/test/dynamo-provider.test.ts b/test/dynamo-provider.test.ts deleted file mode 100644 index b5694c8..0000000 --- a/test/dynamo-provider.test.ts +++ /dev/null @@ -1,468 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -import { createAssistantMessageEventStream, type Context, type Model, type SimpleStreamOptions } from "@mariozechner/pi-ai"; -import { describe, expect, it } from "vitest"; -import { - applySubagentBridge, - buildDynamoAgentContext, - buildDynamoHeaders, - computeSubagentTrajectoryId, - computeSubagentTrajectoryRewrite, - createDynamoStreamSimple, - DEFAULT_DYNAMO_BASE_URL, - DEFAULT_DYNAMO_MODEL_ID, - DEFAULT_SESSION_TYPE_ID, - type DynamoProviderRuntimeConfig, - DYNAMO_API, - mergeDynamoAgentContext, - normalizeDynamoBaseUrl, - readDynamoConfig, - seedRootTrajectory, - sendTrajectoryFinal, -} from "../src/dynamo-provider.js"; - -// Spread `base` with the given keys dropped (env-absent). Avoids the -// exactOptionalPropertyTypes friction of `{ ...base, KEY: undefined }`, which TS -// rejects because an explicit `undefined` is not assignable to an optional -// `string` property. -function envWithout>(base: T, ...keys: (keyof T)[]): Partial { - const copy: Partial = { ...base }; - for (const key of keys) delete copy[key]; - return copy; -} - -const config = { - baseUrl: DEFAULT_DYNAMO_BASE_URL, - apiKey: "test-key", - traceEnabled: true, - sessionTypeId: DEFAULT_SESSION_TYPE_ID, - isSubagent: false, -}; - -const model = { - id: DEFAULT_DYNAMO_MODEL_ID, - name: "Default", - api: DYNAMO_API, - provider: "dynamo", - baseUrl: DEFAULT_DYNAMO_BASE_URL, - reasoning: false, - input: ["text"], - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: 128000, - maxTokens: 8192, -} satisfies Model; - -const context: Context = { - messages: [], -}; - -describe("dynamo provider config", () => { - it("normalizes bare endpoint roots to /v1", () => { - expect(normalizeDynamoBaseUrl("http://127.0.0.1:8000")).toBe("http://127.0.0.1:8000/v1"); - expect(normalizeDynamoBaseUrl("http://127.0.0.1:8000/v1/")).toBe("http://127.0.0.1:8000/v1"); - }); - - it("reads env values with Dynamo precedence", () => { - expect( - readDynamoConfig({ - OPENAI_BASE_URL: "http://ignored.test/v1", - DYNAMO_BASE_URL: "http://dynamo.test", - DYNAMO_API_KEY: "dyn-key", - DYN_REQUEST_TRACE: "1", - DYN_AGENT_SESSION_TYPE_ID: "session-kind", - DYN_AGENT_SESSION_ID: "session-id", - DYN_AGENT_TRAJECTORY_ID: "trajectory-id", - DYN_AGENT_PARENT_TRAJECTORY_ID: "parent-id", - }), - ).toEqual({ - baseUrl: "http://dynamo.test/v1", - apiKey: "dyn-key", - traceEnabled: true, - sessionTypeId: "session-kind", - sessionId: "session-id", - trajectoryId: "trajectory-id", - parentTrajectoryId: "parent-id", - isSubagent: false, - }); - }); - - it("treats DYN_REQUEST_TRACE as a truthy master switch, default off", () => { - expect(readDynamoConfig({}).traceEnabled).toBe(false); - for (const v of ["1", "true", "TRUE", "yes", "on"]) { - expect(readDynamoConfig({ DYN_REQUEST_TRACE: v }).traceEnabled).toBe(true); - } - for (const v of ["0", "false", "no", ""]) { - expect(readDynamoConfig({ DYN_REQUEST_TRACE: v }).traceEnabled).toBe(false); - } - }); -}); - -describe("pi-subagents trajectory bridge", () => { - const childEnv = { - DYN_AGENT_TRAJECTORY_ID: "parent-traj", - PI_SUBAGENT_CHILD: "1", - PI_SUBAGENT_RUN_ID: "run-1", - PI_SUBAGENT_CHILD_AGENT: "researcher", - PI_SUBAGENT_CHILD_INDEX: "2", - } as const; - - it("reinterprets inherited DYN_AGENT_TRAJECTORY_ID as parent when in a subagent child", () => { - expect(computeSubagentTrajectoryRewrite(childEnv)).toEqual({ - parentTrajectoryId: "parent-traj", - trajectoryId: "run-1:researcher:2", - }); - }); - - it("defaults PI_SUBAGENT_CHILD_INDEX to 0 when absent", () => { - const { PI_SUBAGENT_CHILD_INDEX: _omit, ...envWithoutIndex } = childEnv; - expect(computeSubagentTrajectoryRewrite(envWithoutIndex)).toEqual({ - parentTrajectoryId: "parent-traj", - trajectoryId: "run-1:researcher:0", - }); - }); - - it("skips the bridge when PI_SUBAGENT_CHILD is not 1", () => { - expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "PI_SUBAGENT_CHILD"))).toBeNull(); - }); - - it("uses an explicit DYN_AGENT_PARENT_TRAJECTORY_ID when present (manual wins)", () => { - expect( - computeSubagentTrajectoryRewrite({ ...childEnv, DYN_AGENT_PARENT_TRAJECTORY_ID: "manual-parent" }), - ).toEqual({ - parentTrajectoryId: "manual-parent", - trajectoryId: "run-1:researcher:2", - }); - }); - - it("still creates a child trajectory when inherited DYN_AGENT_TRAJECTORY_ID is absent", () => { - expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "DYN_AGENT_TRAJECTORY_ID"))).toEqual({ - trajectoryId: "run-1:researcher:2", - }); - }); - - it("skips when PI_SUBAGENT_RUN_ID or PI_SUBAGENT_CHILD_AGENT is missing", () => { - expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "PI_SUBAGENT_RUN_ID"))).toBeNull(); - expect(computeSubagentTrajectoryRewrite(envWithout(childEnv, "PI_SUBAGENT_CHILD_AGENT"))).toBeNull(); - }); - - it("readDynamoConfig surfaces the synthesized ids", () => { - const cfg = readDynamoConfig(childEnv); - expect(cfg.trajectoryId).toBe("run-1:researcher:2"); - expect(cfg.parentTrajectoryId).toBe("parent-traj"); - expect(cfg.isSubagent).toBe(true); - }); - - it("applySubagentBridge mutates process.env so nested spawns chain correctly", () => { - const env: NodeJS.ProcessEnv = { ...childEnv }; - expect(applySubagentBridge(env)).toBe(true); - expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("run-1:researcher:2"); - expect(env.DYN_AGENT_PARENT_TRAJECTORY_ID).toBe("parent-traj"); - - // Idempotent: a second call sees the now-set parent and short-circuits. - expect(applySubagentBridge(env)).toBe(false); - expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("run-1:researcher:2"); - - // Chaining: when this grandchild spawns its own subagent, pi-subagents - // passes { ...process.env, ...subagentEnv }. The grandchild then sees - // its own synthesized id as inherited DYN_AGENT_TRAJECTORY_ID, so the - // next rewrite treats THIS generation as the parent. - const grandchildEnv = { - ...envWithout(env, "DYN_AGENT_PARENT_TRAJECTORY_ID"), - PI_SUBAGENT_CHILD_AGENT: "subworker", - PI_SUBAGENT_CHILD_INDEX: "0", - }; - expect(computeSubagentTrajectoryRewrite(grandchildEnv)).toEqual({ - parentTrajectoryId: "run-1:researcher:2", - trajectoryId: "run-1:subworker:0", - }); - }); -}); - -describe("root trajectory seed", () => { - it("seeds DYN_AGENT_TRAJECTORY_ID at the root so subagents inherit a parent", () => { - const env: NodeJS.ProcessEnv = { DYN_REQUEST_TRACE: "1" }; - expect(seedRootTrajectory(env, () => "root-traj")).toBe(true); - expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("root-traj"); - // The bug fix: a subagent spawned from this env now resolves a parent. - const childEnv = { - ...env, - PI_SUBAGENT_CHILD: "1", - PI_SUBAGENT_RUN_ID: "run-1", - PI_SUBAGENT_CHILD_AGENT: "researcher", - }; - expect(computeSubagentTrajectoryRewrite(childEnv)).toEqual({ - parentTrajectoryId: "root-traj", - trajectoryId: "run-1:researcher:0", - }); - }); - - it("uses DYN_AGENT_SESSION_ID as the root trajectory when present", () => { - const env: NodeJS.ProcessEnv = { DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "sess-7" }; - expect(seedRootTrajectory(env, () => "unused")).toBe(true); - expect(env.DYN_AGENT_TRAJECTORY_ID).toBe("sess-7"); - }); - - it("no-ops when trace is off, in a subagent child, or trajectory already set", () => { - expect(seedRootTrajectory({}, () => "x")).toBe(false); - expect(seedRootTrajectory({ DYN_REQUEST_TRACE: "1", PI_SUBAGENT_CHILD: "1" }, () => "x")).toBe(false); - const preset: NodeJS.ProcessEnv = { DYN_REQUEST_TRACE: "1", DYN_AGENT_TRAJECTORY_ID: "caller" }; - expect(seedRootTrajectory(preset, () => "x")).toBe(false); - expect(preset.DYN_AGENT_TRAJECTORY_ID).toBe("caller"); - }); -}); - -describe("agent context injection", () => { - it("defaults both trajectory_id and session_id to the Pi session ID", () => { - expect(buildDynamoAgentContext(config, { sessionId: "pi-session" })).toEqual({ - trajectory_id: "pi-session", - session_id: "pi-session", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - }); - }); - - it("lets DYN_AGENT_* override the Pi-session defaults", () => { - expect( - buildDynamoAgentContext( - { ...config, trajectoryId: "trajectory-from-env", sessionId: "session-from-env" }, - { sessionId: "pi-session" }, - ), - ).toEqual({ - trajectory_id: "trajectory-from-env", - session_id: "session-from-env", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - }); - }); - - it("merges nvext.agent_context without dropping existing nvext fields", () => { - const payload = mergeDynamoAgentContext( - { - model: "demo", - nvext: { - extra_fields: ["worker_id", "timing"], - agent_context: { - session_id: "existing-session", - custom_field: "kept", - }, - }, - }, - { - trajectory_id: "trajectory", - session_id: "default-session", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - }, - ); - - expect(payload).toEqual({ - model: "demo", - nvext: { - extra_fields: ["worker_id", "timing"], - agent_context: { - trajectory_id: "trajectory", - session_id: "existing-session", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - custom_field: "kept", - }, - }, - }); - }); -}); - -describe("request headers", () => { - it("sets x-request-id when absent", () => { - expect(buildDynamoHeaders(undefined, () => "request-1")).toEqual({ "x-request-id": "request-1" }); - }); - - it("preserves an existing x-request-id header regardless of casing", () => { - expect(buildDynamoHeaders({ "X-Request-Id": "provided" }, () => "request-1")).toEqual({ - "X-Request-Id": "provided", - }); - }); -}); - -describe("streamSimple wrapper", () => { - it("delegates through openai-completions with injected payload and headers", async () => { - let capturedModel: Model<"openai-completions"> | undefined; - let capturedOptions: SimpleStreamOptions | undefined; - const runtimeConfig: DynamoProviderRuntimeConfig = { ...config }; - - const streamSimple = createDynamoStreamSimple( - runtimeConfig, - (openAIModel, _context, options) => { - capturedModel = openAIModel; - capturedOptions = options; - return createAssistantMessageEventStream(); - }, - () => "request-1", - ); - - streamSimple(model, context, { - sessionId: "pi-session", - onPayload: (payload) => payload, - }); - - const onPayload = capturedOptions?.onPayload; - if (!onPayload) { - throw new Error("expected wrapped onPayload"); - } - const injectedPayload = await onPayload({ model: "default" }, model); - - expect(capturedModel?.api).toBe("openai-completions"); - expect(capturedModel?.provider).toBe("dynamo"); - expect(runtimeConfig.sessionId).toBe("pi-session"); - expect(capturedOptions?.apiKey).toBe("test-key"); - expect(capturedOptions?.headers).toEqual({ "x-request-id": "request-1" }); - expect(injectedPayload).toEqual({ - model: "default", - nvext: { - agent_context: { - trajectory_id: "pi-session", - session_id: "pi-session", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - }, - }, - }); - }); - - it("injects nothing when DYN_REQUEST_TRACE is off (plain provider), but still sets x-request-id", async () => { - let capturedOptions: SimpleStreamOptions | undefined; - const streamSimple = createDynamoStreamSimple( - { ...config, traceEnabled: false }, - (_model, _context, options) => { - capturedOptions = options; - return createAssistantMessageEventStream(); - }, - () => "request-1", - ); - - streamSimple(model, context, { sessionId: "pi-session" }); - expect(capturedOptions?.headers).toEqual({ "x-request-id": "request-1" }); - // No onPayload wrapper means no nvext injection. - const payload = { model: "default" }; - expect((await capturedOptions?.onPayload?.(payload, model)) ?? payload).toEqual({ model: "default" }); - }); -}); - -describe("subagent trajectory context", () => { - const subagentEnv = { - DYNAMO_BASE_URL: "http://dynamo.test", - DYN_AGENT_TRAJECTORY_ID: "orchestrator", - PI_SUBAGENT_CHILD: "1", - PI_SUBAGENT_RUN_ID: "run-1", - PI_SUBAGENT_CHILD_AGENT: "scout", - PI_SUBAGENT_CHILD_INDEX: "3", - } as const; - - it("sets child trajectory only for a pi-subagents child", () => { - expect(computeSubagentTrajectoryId(subagentEnv)).toBe("run-1:scout:3"); - const { PI_SUBAGENT_CHILD: _omit, ...leadEnv } = subagentEnv; - expect(computeSubagentTrajectoryId(leadEnv)).toBeUndefined(); - }); - - it("derives child trajectory from PI_SUBAGENT_* alone", () => { - const noTrajectory = envWithout(subagentEnv, "DYN_AGENT_TRAJECTORY_ID"); - const cfg = readDynamoConfig(noTrajectory); - expect(cfg.trajectoryId).toBe("run-1:scout:3"); - expect(cfg.parentTrajectoryId).toBeUndefined(); - expect(cfg.isSubagent).toBe(true); - }); - - it("requires a complete subagent identity (run id + agent name)", () => { - expect(computeSubagentTrajectoryId(envWithout(subagentEnv, "PI_SUBAGENT_RUN_ID"))).toBeUndefined(); - expect(computeSubagentTrajectoryId(envWithout(subagentEnv, "PI_SUBAGENT_CHILD_AGENT"))).toBeUndefined(); - // Index defaults to 0 when absent. - expect(computeSubagentTrajectoryId(envWithout(subagentEnv, "PI_SUBAGENT_CHILD_INDEX"))).toBe("run-1:scout:0"); - }); - - it("trajectory_final sends agent_context only", async () => { - const calls: Array<{ url: string; body: unknown; headers: unknown }> = []; - const fakeFetch = async (url: string, init: RequestInit) => { - calls.push({ - url, - body: JSON.parse(String(init.body)), - headers: init.headers, - }); - return { ok: true, status: 200 }; - }; - - const cfg = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "run-1" }); - expect(await sendTrajectoryFinal(cfg, "zai-org/GLM-4.7-Flash", () => "close-req-1", fakeFetch)).toBe(true); - expect(calls).toHaveLength(1); - expect(calls[0]?.url).toBe("http://dynamo.test/v1/chat/completions"); - expect(calls[0]?.body).toEqual({ - model: "zai-org/GLM-4.7-Flash", - messages: [{ role: "user", content: "." }], - max_tokens: 1, - stream: false, - nvext: { - agent_context: { - trajectory_id: "run-1:scout:3", - parent_trajectory_id: "orchestrator", - session_id: "run-1", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - trajectory_final: true, - }, - }, - }); - expect((calls[0]?.headers as Record)["x-request-id"]).toBe("close-req-1"); - }); - - it("reuses Pi's runtime session id for subagent trajectory_final", async () => { - const calls: Array<{ body: any }> = []; - const cfg = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1" }); - const streamSimple = createDynamoStreamSimple( - cfg, - (_model, _context, _options) => createAssistantMessageEventStream(), - () => "request-1", - ); - streamSimple(model, context, { sessionId: "pi-child-session" }); - - const fakeFetch = async (_url: string, init: RequestInit) => { - calls.push({ body: JSON.parse(String(init.body)) }); - return { ok: true, status: 200 }; - }; - - expect(await sendTrajectoryFinal(cfg, "zai-org/GLM-4.7-Flash", () => "close-req-1", fakeFetch)).toBe(true); - expect(calls[0]?.body.nvext.agent_context).toMatchObject({ - trajectory_id: "run-1:scout:3", - parent_trajectory_id: "orchestrator", - session_id: "pi-child-session", - trajectory_final: true, - }); - }); - - it("streamSimple injects subagent agent_context without session_control", async () => { - let capturedOptions: SimpleStreamOptions | undefined; - const subagentConfig = readDynamoConfig({ ...subagentEnv, DYN_REQUEST_TRACE: "1", DYN_AGENT_SESSION_ID: "run-1" }); - const streamSimple = createDynamoStreamSimple( - subagentConfig, - (_model, _context, options) => { - capturedOptions = options; - return createAssistantMessageEventStream(); - }, - () => "request-1", - ); - - streamSimple(model, context, { sessionId: "pi-session" }); - const onPayload = capturedOptions?.onPayload; - if (!onPayload) throw new Error("expected wrapped onPayload"); - const injected = (await onPayload({ model: DEFAULT_DYNAMO_MODEL_ID }, model)) as { - nvext: { agent_context: unknown; session_control?: unknown }; - }; - - expect(injected.nvext.agent_context).toEqual({ - trajectory_id: "run-1:scout:3", - parent_trajectory_id: "orchestrator", - session_id: "run-1", - session_type_id: DEFAULT_SESSION_TYPE_ID, - phase: "reasoning", - }); - expect(injected.nvext.session_control).toBeUndefined(); - }); -}); diff --git a/test/program-close.test.ts b/test/program-close.test.ts deleted file mode 100644 index 1178686..0000000 --- a/test/program-close.test.ts +++ /dev/null @@ -1,139 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import dynamoProviderExtension from "../src/index.js"; - -// Multiturn program-close contract: the trajectory_final release must fire ONCE -// at true session teardown (session_shutdown reason "quit") — never per user turn -// (agent_end) and never on continuation reasons (reload/fork/new/resume), which -// keep the same trajectory_id alive. - -type Handler = (event: any, ctx?: any) => unknown | Promise; - -function makePi(onRegisterProvider?: (providerConfig: any) => void) { - const handlers: Record = {}; - const pi = { - registerProvider: vi.fn((_id: string, providerConfig: any) => { - onRegisterProvider?.(providerConfig); - }), - on: (event: string, handler: Handler) => { - handlers[event] = handler; - }, - }; - return { pi, handlers }; -} - -// Capture POST /chat/completions close pings; answer GET /models for discovery. -function installFetch() { - const closeBodies: any[] = []; - const fetchMock = vi.fn(async (url: any, init: any = {}) => { - const u = String(url); - if (init.method === "POST" && u.includes("/chat/completions")) { - closeBodies.push(JSON.parse(init.body)); - return { ok: true, json: async () => ({ choices: [] }) } as any; - } - // model discovery - return { ok: true, json: async () => ({ data: [{ id: "nvidia/MiniMax-M2.7-NVFP4" }] }) } as any; - }); - vi.stubGlobal("fetch", fetchMock); - return closeBodies; -} - -describe("program close (trajectory_final) — multiturn", () => { - const savedEnv = process.env; - beforeEach(() => { - process.env = { - ...savedEnv, - DYN_REQUEST_TRACE: "1", - DYN_AGENT_SESSION_ID: "t-1", - DYN_AGENT_TRAJECTORY_ID: "t-1", - DYNAMO_BASE_URL: "http://frontend:8000/v1", - }; - delete process.env.PI_SUBAGENT_CHILD; // ensure lead-agent path, not subagent - }); - afterEach(() => { - process.env = savedEnv; - vi.unstubAllGlobals(); - vi.restoreAllMocks(); - }); - - it("does NOT hook agent_end (no per-turn close)", async () => { - const closeBodies = installFetch(); - const { pi, handlers } = makePi(); - await dynamoProviderExtension(pi as any); - expect(handlers.agent_end).toBeUndefined(); - expect(closeBodies).toHaveLength(0); - }); - - it("does NOT close on continuation reasons (reload/fork)", async () => { - const closeBodies = installFetch(); - const { pi, handlers } = makePi(); - await dynamoProviderExtension(pi as any); - await handlers.session_shutdown!({ type: "session_shutdown", reason: "reload" }); - await handlers.session_shutdown!({ type: "session_shutdown", reason: "fork" }); - expect(closeBodies).toHaveLength(0); - }); - - it("closes exactly once on session_shutdown reason 'quit', carrying trajectory_final", async () => { - const closeBodies = installFetch(); - const { pi, handlers } = makePi(); - await dynamoProviderExtension(pi as any); - await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); - // idempotent: a second quit (or any later event) must not re-close - await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); - expect(closeBodies).toHaveLength(1); - const ctx = closeBodies[0].nvext.agent_context; - expect(ctx.trajectory_final).toBe(true); - expect(ctx.trajectory_id).toBe("t-1"); - expect(closeBodies[0].max_tokens).toBe(1); - }); - - it("uses a stable discovered model id for the shutdown close ping", async () => { - const closeBodies = installFetch(); - const { pi, handlers } = makePi((providerConfig) => { - providerConfig.models[0].id = ""; - }); - await dynamoProviderExtension(pi as any); - await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); - - expect(closeBodies).toHaveLength(1); - expect(closeBodies[0].model).toBe("nvidia/MiniMax-M2.7-NVFP4"); - }); -}); - -describe("subagent trajectory close", () => { - const savedEnv = process.env; - beforeEach(() => { - process.env = { - ...savedEnv, - DYN_REQUEST_TRACE: "1", - DYN_AGENT_SESSION_ID: "root-session", - DYN_AGENT_TRAJECTORY_ID: "root-trajectory", - DYNAMO_BASE_URL: "http://frontend:8000/v1", - PI_SUBAGENT_CHILD: "1", - PI_SUBAGENT_RUN_ID: "root-session", - PI_SUBAGENT_CHILD_AGENT: "researcher", - PI_SUBAGENT_CHILD_INDEX: "0", - }; - }); - afterEach(() => { - process.env = savedEnv; - vi.unstubAllGlobals(); - vi.restoreAllMocks(); - }); - - it("closes the child trajectory on agent_end", async () => { - const closeBodies = installFetch(); - const { pi, handlers } = makePi(); - await dynamoProviderExtension(pi as any); - await handlers.agent_end!({ type: "agent_end", messages: [] }); - await handlers.session_shutdown!({ type: "session_shutdown", reason: "quit" }); - - expect(closeBodies).toHaveLength(1); - const ctx = closeBodies[0].nvext.agent_context; - expect(ctx.trajectory_final).toBe(true); - expect(ctx.trajectory_id).toBe("root-session:researcher:0"); - expect(ctx.parent_trajectory_id).toBe("root-trajectory"); - }); -}); diff --git a/test/tool-relay.test.ts b/test/tool-relay.test.ts deleted file mode 100644 index 9f4d3e8..0000000 --- a/test/tool-relay.test.ts +++ /dev/null @@ -1,243 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -import { decode } from "@msgpack/msgpack"; -import type { ExtensionContext } from "@mariozechner/pi-coding-agent"; -import { describe, expect, it } from "vitest"; -import { DEFAULT_DYNAMO_BASE_URL, DEFAULT_SESSION_TYPE_ID } from "../src/dynamo-provider.js"; -import { - buildDynamoRequestTraceAgentContext, - DEFAULT_TOOL_EVENT_QUEUE_CAPACITY, - DynamoToolEventPublisher, - DynamoToolEventRelay, - getToolClass, - readDynamoToolRelayConfig, - type DynamoRequestTraceRecord, - type ToolEventSocket, -} from "../src/tool-relay.js"; - -const config = { - baseUrl: DEFAULT_DYNAMO_BASE_URL, - apiKey: "test-key", - traceEnabled: true, - sessionTypeId: DEFAULT_SESSION_TYPE_ID, -}; - -class FakeToolEventSocket implements ToolEventSocket { - connectedEndpoint: string | undefined; - closed = false; - readonly sent: [Buffer, Buffer, Buffer][] = []; - - async connect(endpoint: string): Promise { - this.connectedEndpoint = endpoint; - } - - async send(frames: [Buffer, Buffer, Buffer]): Promise { - this.sent.push(frames); - } - - close(): void { - this.closed = true; - } -} - -function createContext(sessionId: string): ExtensionContext { - return { - sessionManager: { - getSessionId: () => sessionId, - }, - } as unknown as ExtensionContext; -} - -function decodeTraceRecord(frame: Buffer): DynamoRequestTraceRecord { - return decode(frame) as DynamoRequestTraceRecord; -} - -describe("tool relay config", () => { - it("reads Dynamo tool relay env aliases", () => { - expect( - readDynamoToolRelayConfig({ - DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT: "tcp://127.0.0.1:20390", - DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC: "tools", - }), - ).toEqual({ - endpoint: "tcp://127.0.0.1:20390", - topic: "tools", - queueCapacity: DEFAULT_TOOL_EVENT_QUEUE_CAPACITY, - }); - - expect( - readDynamoToolRelayConfig({ - DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_ENDPOINT: "ipc:///tmp/pi-tools", - DYN_REQUEST_TRACE_TOOL_EVENTS_ZMQ_TOPIC: "pi-tools", - DYN_REQUEST_TRACE_TOOL_EVENTS_QUEUE_CAPACITY: "7", - }), - ).toEqual({ - endpoint: "ipc:///tmp/pi-tools", - topic: "pi-tools", - queueCapacity: 7, - }); - }); -}); - -describe("tool relay agent context", () => { - it("uses the Pi session ID as default trajectory and session ID", () => { - expect(buildDynamoRequestTraceAgentContext(config, "pi-session")).toEqual({ - session_type_id: DEFAULT_SESSION_TYPE_ID, - session_id: "pi-session", - trajectory_id: "pi-session", - }); - }); - - it("uses env session/trajectory IDs when provided", () => { - expect( - buildDynamoRequestTraceAgentContext( - { - ...config, - sessionId: "session-1", - trajectoryId: "trajectory-1", - parentTrajectoryId: "parent-1", - }, - "pi-session", - ), - ).toEqual({ - session_type_id: DEFAULT_SESSION_TYPE_ID, - session_id: "session-1", - trajectory_id: "trajectory-1", - parent_trajectory_id: "parent-1", - }); - }); -}); - -describe("tool relay records", () => { - it("publishes msgpack-framed tool_start and tool_end records", async () => { - const socket = new FakeToolEventSocket(); - const publisher = new DynamoToolEventPublisher( - { endpoint: "tcp://127.0.0.1:20390", topic: "tools", queueCapacity: 10 }, - () => socket, - ); - await publisher.start(); - - let unixMs = 1000; - let perfMs = 10; - const relay = new DynamoToolEventRelay( - { ...config, sessionId: "session-1" }, - publisher, - () => unixMs, - () => perfMs, - ); - - relay.handleToolExecutionStart( - { toolCallId: "call-1", toolName: "agent_tools---search", args: { query: "hello" } }, - createContext("pi-session"), - ); - await publisher.flush(); - - expect(socket.connectedEndpoint).toBe("tcp://127.0.0.1:20390"); - expect(socket.sent).toHaveLength(1); - expect(socket.sent[0]?.[0].toString("utf8")).toBe("tools"); - expect(socket.sent[0]?.[1].readBigUInt64BE()).toBe(0n); - expect(decodeTraceRecord(socket.sent[0]?.[2] ?? Buffer.alloc(0))).toEqual({ - schema: "dynamo.request.trace.v1", - event_type: "tool_start", - event_time_unix_ms: 1000, - event_source: "harness", - agent_context: { - session_type_id: DEFAULT_SESSION_TYPE_ID, - session_id: "session-1", - trajectory_id: "pi-session", - }, - tool: { - tool_call_id: "call-1", - tool_class: "agent_tools", - started_at_unix_ms: 1000, - status: "running", - }, - }); - - unixMs = 1500; - perfMs = 15.25; - relay.handleToolExecutionEnd( - { - toolCallId: "call-1", - toolName: "agent_tools---search", - result: { content: [{ type: "text", text: "done" }] }, - isError: false, - }, - createContext("pi-session"), - ); - await publisher.flush(); - - expect(socket.sent).toHaveLength(2); - expect(socket.sent[1]?.[1].readBigUInt64BE()).toBe(1n); - expect(decodeTraceRecord(socket.sent[1]?.[2] ?? Buffer.alloc(0))).toEqual({ - schema: "dynamo.request.trace.v1", - event_type: "tool_end", - event_time_unix_ms: 1500, - event_source: "harness", - agent_context: { - session_type_id: DEFAULT_SESSION_TYPE_ID, - session_id: "session-1", - trajectory_id: "pi-session", - }, - tool: { - tool_call_id: "call-1", - tool_class: "agent_tools", - started_at_unix_ms: 1000, - ended_at_unix_ms: 1500, - duration_ms: 5.25, - status: "succeeded", - output_bytes: 4, - }, - }); - }); - - it("publishes self-contained terminal errors even without a start event", async () => { - const socket = new FakeToolEventSocket(); - const publisher = new DynamoToolEventPublisher( - { endpoint: "tcp://127.0.0.1:20390", topic: "tools", queueCapacity: 10 }, - () => socket, - ); - await publisher.start(); - - const relay = new DynamoToolEventRelay(config, publisher, () => 2000, () => 20); - relay.handleToolExecutionEnd( - { - toolCallId: "call-2", - toolName: "bash", - result: { content: [{ type: "text", text: "failed" }] }, - isError: true, - }, - createContext("pi-session"), - ); - await publisher.flush(); - - expect(decodeTraceRecord(socket.sent[0]?.[2] ?? Buffer.alloc(0))).toEqual({ - schema: "dynamo.request.trace.v1", - event_type: "tool_error", - event_time_unix_ms: 2000, - event_source: "harness", - agent_context: { - session_type_id: DEFAULT_SESSION_TYPE_ID, - session_id: "pi-session", - trajectory_id: "pi-session", - }, - tool: { - tool_call_id: "call-2", - tool_class: "bash", - started_at_unix_ms: 2000, - ended_at_unix_ms: 2000, - duration_ms: 0, - status: "error", - error_type: "pi_tool_error", - output_bytes: 6, - }, - }); - }); - - it("normalizes Pi and MCP-style tool names to tool classes", () => { - expect(getToolClass("agent_tools---search")).toBe("agent_tools"); - expect(getToolClass("mcp/server.tool")).toBe("mcp"); - expect(getToolClass("bash")).toBe("bash"); - }); -});