diff --git a/.env.example b/.env.example index 665cf382..a106849e 100644 --- a/.env.example +++ b/.env.example @@ -72,6 +72,7 @@ # KIMI_FOR_CODING_BASE_URL=https://api.kimi.com/coding # MAX_TOKENS=4096 # Cap LLM completion tokens for compression / summarise calls +# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Estimated LLM input+output token cap per session for compress/summarise calls. Set 0 to disable. # AGENTMEMORY_COMPRESS_MODEL=cheap-model # Optional model for provider.compress(); provider.summarize() keeps the provider model # AGENTMEMORY_OUTPUT_LANG=match # Optional generated-text language. Empty/unset keeps default prompts unchanged. # # Use "match" to follow input/observation language, a known code such as de/ja/pt-BR, diff --git a/README.md b/README.md index f0b67e0c..76804a2a 100644 --- a/README.md +++ b/README.md @@ -1696,6 +1696,10 @@ Create `~/.agentmemory/.env`: # AGENTMEMORY_CODEX_TIMEOUT_MS=60000 # Optional; overrides AGENTMEMORY_LLM_TIMEOUT_MS # AGENTMEMORY_COMPRESS_MODEL=cheap-model # Optional: provider.compress() model override; # # provider.summarize() keeps the provider model. +# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Estimated per-session cap for LLM-backed +# # compress/summarize calls. Set 0 to disable. +# # /agentmemory/session/start can override +# # this per session with sessionTokenCap. # Opt-in Claude-subscription fallback (requires npm install @anthropic-ai/claude-agent-sdk); # spawns @anthropic-ai/claude-agent-sdk sessions; # leave OFF unless you understand the Stop-hook recursion risk (#149 follow-up): @@ -1761,6 +1765,7 @@ Create `~/.agentmemory/.env`: # BM25_WEIGHT=0.4 # VECTOR_WEIGHT=0.6 # TOKEN_BUDGET=2000 +# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Auth # AGENTMEMORY_SECRET=your-secret @@ -1890,7 +1895,7 @@ curl -X POST "$AGENTMEMORY_URL/agentmemory/import" \ | Method | Path | Description | |--------|------|-------------| | `GET` | `/agentmemory/health` | Health check (always public) | -| `POST` | `/agentmemory/session/start` | Start session + get context; accepts optional `title`, `summary`, `firstPrompt`, `model`, `agent`, `metadata`, and `agentId` | +| `POST` | `/agentmemory/session/start` | Start session + get context; accepts optional `title`, `summary`, `firstPrompt`, `model`, `agent`, `metadata`, `agentId`, and per-session `sessionTokenCap` | | `POST` | `/agentmemory/session/end` | End session | | `POST` | `/agentmemory/observe` | Capture observation | | `POST` | `/agentmemory/smart-search` | Hybrid search | diff --git a/docs/todos/2026-06-19-issue-311-session-token-budget/arena-synthesis.md b/docs/todos/2026-06-19-issue-311-session-token-budget/arena-synthesis.md new file mode 100644 index 00000000..032b2c5f --- /dev/null +++ b/docs/todos/2026-06-19-issue-311-session-token-budget/arena-synthesis.md @@ -0,0 +1,82 @@ +# Issue 311 Arena Synthesis + +## Verdict + +Base: Candidate 2 (`/tmp/arena-issue-311-session-token-budget/candidate-2/solution.md`). + +Cross-judge recommendation: Candidate 2 scored 24/25, Candidate 3 scored 22/25, Candidate 1 scored 20/25. + +I independently read all three candidate artifacts end to end and agree with the judge. Candidate 2 best fits the repository because it keeps budget mutation behind registered iii functions and an `sdk.trigger()` budget guard, avoids new MCP/REST surfaces, and gives the smallest coherent file/test plan. + +## Grafts + +- From Candidate 3: + - Use reservation-style accounting or equivalent settlement semantics so concurrent chunked summaries cannot overspend by racing preflight checks. + - Add stale-reservation cleanup behavior and tests. + - Include provider-failure settlement tests: provider failures after a request starts count prompt tokens conservatively; local budget blocks do not count as provider failures. + - Use a typed `SessionTokenBudgetExceededError` with only safe fields. + - Keep explicit tests for concurrent reservations, fork-fresh session ids, system sentinel attribution, and fallback-chain risk. +- From Candidate 1: + - Check an already-open circuit breaker before reserving or charging budget. + - Include `sourceFunction` or `purpose` provider-call context so budget logs/audit/status can identify `mem::compress`, `mem::summarize`, chunk/reduce calls, and background work without recording prompt text. + - Keep `describeImage()` outside the first implementation unless the issue owner explicitly expands scope to vision-call budgeting. + +## Rejections + +- Do not pass a direct StateKV-backed controller into providers when a late-bound `sdk.trigger()` guard can enforce the policy through registered functions. +- Do not add MCP tools for this feature. +- Do not add a dedicated REST status endpoint in the first pass; ride on `/agentmemory/health` for `agentmemory status` counts to avoid endpoint-count churn. +- Do not reserve budget before checking an already-open circuit breaker. +- Do not expand provider return types to exact token usage in issue #311. Current providers return strings, so estimated accounting is the correct first pass. +- Do not enable a hardcoded default cap without confirmation. Disabled-by-default via unset `AGENTMEMORY_SESSION_TOKEN_CAP` is the safer backward-compatible baseline; per-session overrides can opt in. + +## Synthesized Design + +Implement a new StateKV-backed session-budget subsystem: + +- `KV.sessionBudgets = "mem:session-budgets"`. +- `SessionBudget` types in `src/types.ts`. +- `mem::session-budget-init`, `mem::session-budget-reserve` or `mem::session-budget-record` with preflight semantics, `mem::session-budget-status`, and `mem::session-budget-reap`. +- `withKeyedLock("session-budget:")` around budget mutation. +- Missing session id maps to a fixed `SYSTEM_SESSION_BUDGET_ID = "__system__"`. +- Token accounting uses the repo's existing estimate style, `Math.max(1, Math.ceil(text.length / 3))`, because `MemoryProvider` currently returns strings and no usage metadata. +- Soft warning threshold is 80 percent and emits once per row. +- Hard block happens before provider call when a row is exhausted or the prompt estimate cannot fit remaining/reserved budget. +- The call that crosses the cap after output accounting may finish, then future calls block. +- Reaper clears stale reservations and old completed/orphan budget rows. + +Wire enforcement centrally: + +- Add optional provider-call context to `MemoryProvider.compress()` and `MemoryProvider.summarize()`. +- Add a late-bound `ProviderBudgetGuard` to `ResilientProvider`; the guard uses `sdk.trigger()` against registered session-budget functions, not direct KV. +- Apply output-language prompt changes before token estimation. +- Check circuit breaker before budget reservation for already-open local breaker. +- Do not count local budget blocks as circuit-breaker failures. +- Pass session context at known session call sites: `mem::compress`, `mem::summarize`, retry helper, sliding-window, skill-extract, crystallize, single-session graph/temporal graph; use sentinel for mixed/background calls. + +Fallback behavior: + +- `mem::compress` catches `SessionTokenBudgetExceededError` and writes existing synthetic compression. +- `mem::summarize` catches the same error and writes an honest deterministic synthetic summary that states LLM summarization was skipped because the session budget was exhausted. +- Other provider-backed functions return their existing structured error shape unless they already have a deterministic fallback. + +Status and docs: + +- Add budget counts to `/agentmemory/health`, then print `Budgets: N active, M near cap, K exhausted` in `agentmemory status`. +- Document `AGENTMEMORY_SESSION_TOKEN_CAP` separately from context `TOKEN_BUDGET`. +- Add `.env.example` entry. + +## Verification Target + +Before implementation claims: + +- Targeted red/green vitest for session-budget functions, provider guard, API session start, compression fallback, summarization fallback, CLI/status display, config parsing, and schema constants. +- `corepack pnpm test`. +- `corepack pnpm run lint`. +- `corepack pnpm run build`. +- Semgrep for code/config/persistence/API changes. +- Staged Gitleaks before commit. + +## Current Blocker + +Human Checkpoint remains required before production implementation edits because the synthesized design changes persisted state/schema, API/session-start behavior, and provider interface behavior. diff --git a/docs/todos/2026-06-19-issue-311-session-token-budget/plan.md b/docs/todos/2026-06-19-issue-311-session-token-budget/plan.md new file mode 100644 index 00000000..a5e468b6 --- /dev/null +++ b/docs/todos/2026-06-19-issue-311-session-token-budget/plan.md @@ -0,0 +1,1091 @@ +# Issue 311 Session Token Budget Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a per-session estimated LLM token budget with an 80% soft warning and hard block after exhaustion. + +**Architecture:** Add a StateKV-backed `mem:session-budget` scope, `AsyncLocalStorage` session context, registered iii budget init/record functions, a budget reap trigger, budget soft-warn/exhausted events, and a provider-level guard in `ResilientProvider`. Budget accounting is high-level per `compress()` / `summarize()` call because current providers expose string responses but no exact token usage. `compress`/`summarize` degrade through deterministic no-LLM fallbacks on budget exhaustion. + +**Tech Stack:** TypeScript ESM, iii-sdk `registerFunction`/`sdk.trigger`, StateKV, vitest, pnpm. + +--- + +## Sources Of Truth + +- Issue: GitHub issue #311. +- Task record: `docs/todos/2026-06-19-issue-311-session-token-budget/todo.md`. +- Arena synthesis: `docs/todos/2026-06-19-issue-311-session-token-budget/arena-synthesis.md`. +- No separate durable spec exists. Use the task record, arena synthesis, and the latest user-visible design decision as the source of truth. + +## Accepted Review Corrections + +Pre-implementation review found acceptance mismatches in the first plan. These corrections override any conflicting older task steps below. + +- Use `AsyncLocalStorage` for session id propagation. Do not rely on optional provider-call parameters as the primary mechanism. +- Use exact KV scope `mem:session-budget`. +- Keep the public budget surface to `mem::session::budget::init` and `mem::session::budget::record`; add only the internal function needed to host the reap trigger if iii registration requires a function id. +- Initialize budgets from REST session start, durable session start, implicit session creation in `mem::observe`, and lazy init inside budget record for missed sessions/system sentinel. +- Emit `event::mem::budget::soft-warned` once at 80% and `event::mem::budget::exhausted` when the hard cap is reached or a future call is blocked. +- Register and record OTEL histogram `agentmemory.session.tokens_used`. +- Failed provider calls record a budget event with `inputTokens: 0` and `outputTokens: 0`; they do not charge estimated prompt tokens. +- Budget exhaustion inside chunked summarize aborts before the next chunk/reduce call and persists a deterministic summary with `truncated: true`. +- Fallback-chain retries are treated as one high-level call in this first implementation, because token usage is estimated from the final string-based provider contract. Do not claim per-provider-attempt accounting until providers expose real usage. + +## Boundary Decision Confirmed + +Implementation changes persisted KV state/schema, provider guard behavior, session-start API behavior, health/status output, and telemetry. The user replied `weiter` after the checkpoint, so implementation may proceed within the issue acceptance boundaries above. + +Plan uses this default policy from the latest recommendation: + +- `AGENTMEMORY_SESSION_TOKEN_CAP` unset or blank defaults to `100000`. +- `AGENTMEMORY_SESSION_TOKEN_CAP=0` disables budget enforcement. +- A positive per-session `sessionTokenCap` overrides the default for that session. +- Invalid configured values fall back to `100000`; invalid API override values return HTTP 400. + +## Feature / Verification Matrix + +| Change | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Config and type/schema additions | `test/session-budget.test.ts`, `test/env-loader.test.ts`, TypeScript build | Pending | New `SessionBudget` types and `KV.sessionBudgets` | +| Budget init/record/reap functions | Red/green `test/session-budget.test.ts` | Pending | Covers default cap, disable, per-session override, warning/exhausted events, concurrent increment, failed-call zero increment, system sentinel, lazy init, reap | +| AsyncLocalStorage session context | Red/green `test/session-context.test.ts`, call-site tests | Pending | `mem::observe`, `mem::compress`, `mem::summarize`, and consolidation pipeline enter session context | +| Provider enforcement | Red/green `test/resilient-provider.test.ts` | Pending | Blocks before inner provider call, records completion/failure, does not charge open circuit, emits hard block | +| Session call-site fallbacks | Red/green `test/compress-budget.test.ts`, `test/summarize.test.ts` | Pending | Synthetic compression/summary when budget exhausted; chunk/reduce exhaustion stops retries and marks summary `truncated: true` | +| Session start, implicit session, health, CLI, docs | `test/api-session-start.test.ts`, `test/observe-implicit-session.test.ts`, CLI renderer/status tests, README/.env grep | Pending | `sessionTokenCap`, health budget counts, `agentmemory status` line | +| Telemetry acceptance | `test/telemetry-setup.test.ts` or focused source/behavior assertion | Pending | OTEL histogram `agentmemory.session.tokens_used` exists and budget record observes tokens | +| Final gates | `corepack pnpm test`, `corepack pnpm run lint`, `corepack pnpm run build`, Semgrep, staged Gitleaks | Pending | Required before commit/PR | + +## Subagent Ledger + +| Workstream | Scope | Edits allowed | Expected output | Result | Residual risk | +| --- | --- | --- | --- | --- | --- | +| Pre-implementation reviewers | Plan, task record, arena synthesis, repo source | No | High/Medium findings before implementation | Done | Findings accepted: ALS, events, OTEL, implicit sessions, chunked summarize fallback, API harness, CLI behavior coverage. | +| Implementation workers | Partitioned by plan tasks after review | Yes, disjoint task-owned scopes only | TDD patches plus targeted verification | Pending | Shared provider/types surface requires serial integration or one integrator. | +| Final reviewers | Diff after targeted tests | No | Security, test coverage, maintainability findings | Pending | Persistence/API/provider changes are security-sensitive. | + +## Files + +Create: + +- `src/functions/session-budget.ts` - registered budget functions, budget guard factory, token estimator, typed budget error. +- `test/session-budget.test.ts` - focused budget state/function tests. +- `test/compress-budget.test.ts` - budget-exhausted compression fallback tests. + +Modify: + +- `src/state/schema.ts` - add `KV.sessionBudgets`. +- `src/types.ts` - add budget/provider context types and audit operation union entries. +- `src/config.ts` - parse `AGENTMEMORY_SESSION_TOKEN_CAP`. +- `src/providers/resilient.ts` - enforce budget guard around `compress()` and `summarize()`. +- `src/providers/fallback-chain.ts` - forward optional provider context. +- `src/eval/self-correct.ts` - forward provider context through compression retries. +- `src/functions/compress.ts` - pass context and synthesize fallback on budget exhaustion. +- `src/functions/summarize.ts` - pass context and synthesize fallback summary on budget exhaustion. +- `src/functions/sliding-window.ts`, `src/functions/skill-extract.ts`, `src/functions/crystallize.ts`, `src/functions/graph.ts`, `src/functions/temporal-graph.ts` - pass session context when clearly available. +- `src/functions/consolidate.ts`, `src/functions/consolidation-pipeline.ts`, `src/functions/query-expansion.ts`, `src/functions/compress-file.ts`, `src/functions/flow-compress.ts`, `src/functions/reflect.ts` - rely on system sentinel unless a single session id is already explicit. +- `src/triggers/api.ts` - whitelist `sessionTokenCap` on session start and include budget status in health. +- `src/triggers/events.ts` - initialize budget on durable session start. +- `src/index.ts` - register session-budget functions, late-bind provider guard, start reap interval. +- `src/cli.ts` - display session budget status. +- `src/telemetry/setup.ts` - add budget counters/histogram if implementation needs explicit non-OTEL function metrics. +- `.env.example`, `README.md` - document the cap and default/disable behavior. +- Existing tests: `test/resilient-provider.test.ts`, `test/api-session-start.test.ts`, `test/summarize.test.ts`, `test/session-end-triggers-graph.test.ts`, `test/api-boundary-coverage.test.ts`, `test/env-loader.test.ts`, and any provider-call-site tests requiring type updates. + +## Task 1: Budget Types, Config, And Schema + +**Files:** + +- Modify: `src/state/schema.ts` +- Modify: `src/types.ts` +- Modify: `src/config.ts` +- Test: `test/session-budget.test.ts` +- Test: `test/env-loader.test.ts` + +- [ ] **Step 1: Write failing tests for config and schema** + +Add initial assertions: + +```ts +// test/session-budget.test.ts +import { describe, expect, it } from "vitest"; +import { KV } from "../src/state/schema.js"; +import { + getSessionTokenCap, + DEFAULT_SESSION_TOKEN_CAP, +} from "../src/config.js"; + +describe("session token budget config", () => { + it("adds the session budget KV scope", () => { + expect(KV.sessionBudgets).toBe("mem:session-budgets"); + }); + + it("defaults to 100000 tokens when unset or blank", () => { + expect(DEFAULT_SESSION_TOKEN_CAP).toBe(100000); + expect(getSessionTokenCap({})).toBe(100000); + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "" })).toBe(100000); + }); + + it("allows zero to disable session budget enforcement", () => { + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "0" })).toBe(0); + }); + + it("accepts positive integer override values", () => { + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "25000" })).toBe(25000); + }); + + it("falls back to default for malformed configured values", () => { + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "100k" })).toBe(100000); + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "-1" })).toBe(100000); + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "1.5" })).toBe(100000); + }); +}); +``` + +- [ ] **Step 2: Run tests and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/session-budget.test.ts +``` + +Expected: fail because `KV.sessionBudgets`, `DEFAULT_SESSION_TOKEN_CAP`, and `getSessionTokenCap()` do not exist. + +- [ ] **Step 3: Add minimal config/schema/types** + +Implement: + +```ts +// src/state/schema.ts +sessionBudgets: "mem:session-budgets", +``` + +```ts +// src/config.ts +export const DEFAULT_SESSION_TOKEN_CAP = 100_000; + +export function getSessionTokenCap( + env: Record = getMergedEnv(), +): number { + const raw = env["AGENTMEMORY_SESSION_TOKEN_CAP"]; + if (raw === undefined || raw.trim() === "") return DEFAULT_SESSION_TOKEN_CAP; + if (raw.trim() === "0") return 0; + if (!/^\d+$/.test(raw.trim())) return DEFAULT_SESSION_TOKEN_CAP; + const parsed = Number(raw.trim()); + return Number.isSafeInteger(parsed) && parsed > 0 + ? parsed + : DEFAULT_SESSION_TOKEN_CAP; +} +``` + +```ts +// src/types.ts +export type SessionBudgetStatus = "active" | "near_cap" | "exhausted"; +export type SessionBudgetSource = + | "env" + | "session_override" + | "implicit" + | "system"; +export type SessionBudgetOperation = "compress" | "summarize"; + +export interface SessionBudgetReservation { + id: string; + tokens: number; + operation: SessionBudgetOperation; + sourceFunction?: string; + providerName?: string; + reservedAt: string; +} + +export interface SessionBudget { + sessionId: string; + capTokens: number; + usedTokens: number; + reservedTokens: number; + reservations: Record; + status: SessionBudgetStatus; + source: SessionBudgetSource; + estimated: true; + createdAt: string; + updatedAt: string; + lastRecordedAt?: string; + softWarnedAt?: string; + exhaustedAt?: string; + lastBlockedAt?: string; + callCount: number; + blockedCallCount: number; + failedCallCount: number; +} + +export interface ProviderCallContext { + sessionId?: string; + sourceFunction?: string; + purpose?: string; +} +``` + +Update `AgentMemoryConfig` with `sessionTokenCap: number`. + +- [ ] **Step 4: Wire config into `loadConfig()`** + +Add `sessionTokenCap: getSessionTokenCap(env)` to the returned config object. + +- [ ] **Step 5: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/session-budget.test.ts test/env-loader.test.ts +``` + +Expected: pass for the new config/schema tests and existing env-loader coverage. + +## Task 2: Session Budget Functions + +**Files:** + +- Create: `src/functions/session-budget.ts` +- Modify: `src/types.ts` +- Test: `test/session-budget.test.ts` + +- [ ] **Step 1: Write failing tests for init/reserve/record/status** + +Add tests that register the budget functions with `mockSdk()` / `mockKV()` and assert: + +```ts +it("initializes budget rows from the default cap", async () => { + const { sdk, kv } = setupBudgetFunctions(100000); + const result = await sdk.trigger("mem::session-budget-init", { sessionId: "ses_1" }); + expect(result).toMatchObject({ success: true, enabled: true }); + expect(await kv.get(KV.sessionBudgets, "ses_1")).toMatchObject({ + sessionId: "ses_1", + capTokens: 100000, + usedTokens: 0, + status: "active", + }); +}); + +it("does not write rows when cap is disabled", async () => { + const { sdk, kv } = setupBudgetFunctions(0); + const result = await sdk.trigger("mem::session-budget-init", { sessionId: "ses_1" }); + expect(result).toMatchObject({ success: true, enabled: false }); + expect(await kv.get(KV.sessionBudgets, "ses_1")).toBeNull(); +}); + +it("reserves prompt tokens and records completion tokens", async () => { + const { sdk, kv } = setupBudgetFunctions(100); + const reserve = await sdk.trigger("mem::session-budget-reserve", { + sessionId: "ses_1", + operation: "compress", + tokens: 30, + providerName: "test", + sourceFunction: "mem::compress", + }); + expect(reserve).toMatchObject({ allowed: true, reservationId: expect.any(String) }); + await sdk.trigger("mem::session-budget-record", { + sessionId: "ses_1", + reservationId: reserve.reservationId, + operation: "compress", + outputTokens: 20, + success: true, + }); + expect(await kv.get(KV.sessionBudgets, "ses_1")).toMatchObject({ + usedTokens: 50, + reservedTokens: 0, + callCount: 1, + }); +}); +``` + +Also add tests for: + +- soft warning once at 80%. +- completion crossing cap marks exhausted. +- preflight after exhaustion blocks and increments `blockedCallCount`. +- missing `sessionId` uses `SYSTEM_SESSION_BUDGET_ID`. +- fork-fresh behavior with `ses_parent` and `ses_fork`. +- concurrent `Promise.all()` reservations do not lose increments. +- reaper deletes old completed/orphan rows and stale reservations. + +- [ ] **Step 2: Run tests and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/session-budget.test.ts +``` + +Expected: fail because `registerSessionBudgetFunctions()` and budget functions do not exist. + +- [ ] **Step 3: Implement `src/functions/session-budget.ts`** + +Create: + +```ts +export const SYSTEM_SESSION_BUDGET_ID = "__system__"; +export const SESSION_BUDGET_WARNING_RATIO = 0.8; + +export class SessionTokenBudgetExceededError extends Error { + readonly code = "session_token_budget_exhausted"; + constructor( + readonly sessionId: string, + readonly capTokens: number, + readonly usedTokens: number, + readonly reservedTokens: number, + readonly operation: SessionBudgetOperation, + ) { + super("session_token_budget_exhausted"); + } +} + +export function estimateTokens(text: string): number { + return Math.max(1, Math.ceil(text.length / 3)); +} +``` + +Register: + +- `mem::session-budget-init` +- `mem::session-budget-reserve` +- `mem::session-budget-record` +- `mem::session-budget-status` +- `mem::session-budget-reap` + +Use `withKeyedLock(\`session-budget:${sessionId}\`, ...)` for all row mutation. + +Keep payload validation explicit: + +```ts +function asSessionId(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +function asPositiveInt(value: unknown): number | null { + return Number.isInteger(value) && Number(value) > 0 ? Number(value) : null; +} +``` + +State transitions: + +- `active`: `usedTokens + reservedTokens < capTokens * 0.8` +- `near_cap`: at or above threshold and below cap +- `exhausted`: `usedTokens + reservedTokens >= capTokens` + +- [ ] **Step 4: Add audit operations** + +Add to `AuditEntry.operation` in `src/types.ts`: + +```ts +| "session_budget_init" +| "session_budget_record" +| "session_budget_block" +| "session_budget_reap" +``` + +Do not include prompt or response content in audit details. + +- [ ] **Step 5: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/session-budget.test.ts test/audit.test.ts +``` + +Expected: pass. + +## Task 3: Provider Guard And ResilientProvider Enforcement + +**Files:** + +- Modify: `src/types.ts` +- Modify: `src/functions/session-budget.ts` +- Modify: `src/providers/resilient.ts` +- Modify: `src/providers/fallback-chain.ts` +- Test: `test/resilient-provider.test.ts` + +- [ ] **Step 1: Write failing provider guard tests** + +Add tests: + +```ts +it("blocks before calling the inner provider when budget reserve denies", async () => { + const inner = makeCapturingProvider(); + const provider = new ResilientProvider(inner); + provider.setBudgetGuard({ + reserve: vi.fn(async () => ({ + allowed: false, + enabled: true, + reason: "session_token_budget_exhausted", + sessionId: "ses_1", + capTokens: 10, + usedTokens: 10, + reservedTokens: 0, + })), + record: vi.fn(), + }); + + await expect( + provider.compress("system", "user", { sessionId: "ses_1", sourceFunction: "mem::compress" }), + ).rejects.toMatchObject({ code: "session_token_budget_exhausted" }); + expect(inner.calls).toEqual([]); + expect(provider.circuitState.state).toBe("closed"); +}); + +it("checks an open circuit breaker before reserving budget", async () => { + const inner = makeFailingProvider(); + const provider = new ResilientProvider(inner); + const guard = { reserve: vi.fn(), record: vi.fn() }; + provider.setBudgetGuard(guard); + for (let i = 0; i < 3; i++) { + await expect(provider.compress("system", "user")).rejects.toThrow(); + } + await expect(provider.compress("system", "user")).rejects.toThrow("circuit_breaker_open"); + expect(guard.reserve).not.toHaveBeenCalledWith(expect.objectContaining({ sourceFunction: undefined })); +}); +``` + +Also assert: + +- output-language-adjusted system prompt is used for token estimation. +- success records output tokens. +- provider failure records failed prompt attempt. +- missing context reserves against sentinel. +- `FallbackChainProvider` forwards the optional context. + +- [ ] **Step 2: Run tests and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/resilient-provider.test.ts test/fallback-chain.test.ts +``` + +Expected: fail because provider context, budget guard, and typed error are missing. + +- [ ] **Step 3: Extend provider interface** + +Update `MemoryProvider`: + +```ts +compress( + systemPrompt: string, + userPrompt: string, + context?: ProviderCallContext, +): Promise; +summarize( + systemPrompt: string, + userPrompt: string, + context?: ProviderCallContext, +): Promise; +``` + +Concrete providers do not need to consume the third argument. + +- [ ] **Step 4: Add guard types and guard factory** + +In `src/functions/session-budget.ts`, export: + +```ts +export interface ProviderBudgetGuard { + reserve(input: { + sessionId?: string; + operation: SessionBudgetOperation; + tokens: number; + providerName: string; + sourceFunction?: string; + purpose?: string; + }): Promise; + record(input: { + sessionId?: string; + reservationId?: string; + operation: SessionBudgetOperation; + outputTokens: number; + success: boolean; + providerName: string; + sourceFunction?: string; + purpose?: string; + }): Promise; +} + +export function createSessionBudgetGuard( + sdk: Pick, +): ProviderBudgetGuard +``` + +The guard must call only `sdk.trigger()` for `mem::session-budget-reserve` and `mem::session-budget-record`. + +- [ ] **Step 5: Enforce in `ResilientProvider`** + +Add: + +```ts +setBudgetGuard(guard: ProviderBudgetGuard): void { + this.budgetGuard = guard; +} +``` + +Flow: + +1. If breaker open, throw `circuit_breaker_open` before reserve. +2. Apply output language directive. +3. Estimate prompt tokens. +4. Reserve. +5. Throw `SessionTokenBudgetExceededError` on denied reservation without breaker failure. +6. Call inner provider through circuit breaker. +7. Record success output tokens. +8. On provider error after reservation, record failure with `outputTokens: 0`, then rethrow. + +- [ ] **Step 6: Forward context in fallback chain** + +Update `FallbackChainProvider.compress()` and `.summarize()` so the callback receives the optional context and passes it to each concrete provider. + +- [ ] **Step 7: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/resilient-provider.test.ts test/fallback-chain.test.ts +``` + +Expected: pass. + +## Task 4: Register Budget Functions And Wire Runtime + +**Files:** + +- Modify: `src/index.ts` +- Modify: `src/triggers/events.ts` +- Test: `test/reconnect-registration.test.ts` +- Test: `test/events-boundary.test.ts` + +- [ ] **Step 1: Write failing registration tests** + +Add assertions that `src/index.ts` imports/registers `registerSessionBudgetFunctions`, calls `provider.setBudgetGuard(createSessionBudgetGuard(sdk))`, and replays this registration through reconnect. + +Add event test: + +```ts +it("initializes budget on durable session start", async () => { + const { sdk } = setupEvents(); + await sdk.trigger("event::session::started", { + sessionId: "ses_1", + project: "git:repo", + cwd: "/repo", + }); + expect(sdk.triggerCalls).toContainEqual(expect.objectContaining({ + function_id: "mem::session-budget-init", + payload: expect.objectContaining({ sessionId: "ses_1" }), + })); +}); +``` + +- [ ] **Step 2: Run tests and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/reconnect-registration.test.ts test/events-boundary.test.ts +``` + +Expected: fail until registration and event init exist. + +- [ ] **Step 3: Wire runtime** + +In `src/index.ts`: + +- import `registerSessionBudgetFunctions` and `createSessionBudgetGuard`. +- call `registerSessionBudgetFunctions(sdk, kv, config.sessionTokenCap)` before provider-backed functions. +- call `provider.setBudgetGuard(createSessionBudgetGuard(sdk))` after `sdk` exists. +- start an hourly unref'd interval for `mem::session-budget-reap`, mirroring recent-searches sweep. + +- [ ] **Step 4: Initialize durable session budgets** + +In `src/triggers/events.ts`, call `mem::session-budget-init` after writing the session row. Keep context trigger behavior unchanged. + +- [ ] **Step 5: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/reconnect-registration.test.ts test/events-boundary.test.ts +``` + +Expected: pass. + +## Task 5: Session Start API, Health Status, And CLI + +**Files:** + +- Modify: `src/triggers/api.ts` +- Modify: `src/cli.ts` +- Test: `test/api-session-start.test.ts` +- Test: `test/api-boundary-coverage.test.ts` +- Test: `test/session-end-triggers-graph.test.ts` + +- [ ] **Step 1: Write failing API/CLI tests** + +Add to `test/api-session-start.test.ts`: + +```ts +it("accepts positive sessionTokenCap and initializes the budget", async () => { + const { sdk, kv } = setupApi(); + const res = await sdk.trigger("api::session::start", { + body: { + sessionId: "ses_budget", + project: "/tmp/project", + cwd: "/tmp/project", + sessionTokenCap: 25000, + }, + }); + expect(res).toMatchObject({ + status_code: 200, + body: { budget: expect.objectContaining({ enabled: true }) }, + }); + expect(await kv.get(KV.sessionBudgets, "ses_budget")).toMatchObject({ + capTokens: 25000, + }); +}); + +it("rejects invalid sessionTokenCap values", async () => { + const { sdk } = setupApi(); + await expect(sdk.trigger("api::session::start", { + body: { sessionId: "ses_bad", project: "/tmp", cwd: "/tmp", sessionTokenCap: "100" }, + })).resolves.toMatchObject({ + status_code: 400, + body: { error: expect.stringContaining("sessionTokenCap") }, + }); +}); +``` + +Add static CLI test in `test/session-end-triggers-graph.test.ts`: + +```ts +it("status prints session budget counts from health", () => { + expect(cli).toMatch(/healthRes\?\.sessionBudgets/); + expect(cli).toMatch(/near cap/); + expect(cli).not.toMatch(/session-budgets\/status/); +}); +``` + +- [ ] **Step 2: Run tests and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/api-session-start.test.ts test/api-boundary-coverage.test.ts test/session-end-triggers-graph.test.ts +``` + +Expected: fail because `sessionTokenCap`, health budget counts, and CLI output are missing. + +- [ ] **Step 3: Update `api::session::start`** + +Whitelist `sessionTokenCap`: + +- absent: use default/global behavior. +- positive integer: pass as override. +- `0`, string, float, negative: HTTP 400 for API override. + +After `kv.set(KV.sessions, ...)`, trigger `mem::session-budget-init` and include `budget` in response. + +- [ ] **Step 4: Update `api::health`** + +Trigger `mem::session-budget-status` and include: + +```ts +sessionBudgets: { + enabled: boolean; + total: number; + active: number; + nearCap: number; + exhausted: number; + system?: unknown; +} +``` + +Do not add a new REST endpoint. + +- [ ] **Step 5: Update `agentmemory status`** + +In `src/cli.ts`, read `healthRes?.sessionBudgets` and print either: + +```text +Budgets: disabled +``` + +or: + +```text +Budgets: 12 active, 3 near cap, 1 exhausted +``` + +- [ ] **Step 6: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/api-session-start.test.ts test/api-boundary-coverage.test.ts test/session-end-triggers-graph.test.ts +``` + +Expected: pass. + +## Task 6: Compression Fallback And Context Propagation + +**Files:** + +- Modify: `src/eval/self-correct.ts` +- Modify: `src/functions/compress.ts` +- Create: `test/compress-budget.test.ts` +- Test: `test/resilient-provider.test.ts` + +- [ ] **Step 1: Write failing compression fallback test** + +Create `test/compress-budget.test.ts`: + +```ts +it("stores synthetic compression when provider budget is exhausted", async () => { + const provider = { + name: "budgeted", + compress: vi.fn(async () => { + throw new SessionTokenBudgetExceededError("ses_1", 10, 10, 0, "compress"); + }), + summarize: vi.fn(), + } satisfies MemoryProvider; + const { sdk, kv } = setupCompress(provider); + const raw = makeRawObservation("ses_1"); + + const result = await sdk.trigger("mem::compress", { + observationId: raw.id, + sessionId: raw.sessionId, + raw, + }); + + expect(result).toMatchObject({ + success: true, + fallback: "session_token_budget_exhausted", + }); + expect(await kv.get(KV.observations("ses_1"), raw.id)).toMatchObject({ + id: raw.id, + sessionId: "ses_1", + title: expect.any(String), + }); +}); +``` + +- [ ] **Step 2: Run test and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/compress-budget.test.ts +``` + +Expected: fail because fallback behavior is missing. + +- [ ] **Step 3: Forward provider context through retry** + +Update `compressWithRetry(provider, systemPrompt, userPrompt, validator, retries, context?)` and pass context to both `provider.compress()` calls. + +- [ ] **Step 4: Pass context and synthesize fallback** + +In `mem::compress`, pass: + +```ts +{ sessionId: data.sessionId, sourceFunction: "mem::compress", purpose: "observation-compress" } +``` + +Catch `SessionTokenBudgetExceededError`, build `buildSyntheticCompression(data.raw)`, write/index/stream it with the same synthetic path shape used by `observe.ts`, and return `success: true` with `fallback: "session_token_budget_exhausted"`. + +- [ ] **Step 5: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/compress-budget.test.ts test/resilient-provider.test.ts +``` + +Expected: pass. + +## Task 7: Summarize Fallback And Chunk Context + +**Files:** + +- Modify: `src/functions/summarize.ts` +- Test: `test/summarize.test.ts` + +- [ ] **Step 1: Write failing summarize fallback tests** + +Add tests: + +```ts +it("stores a deterministic summary when budget is exhausted", async () => { + const provider: MemoryProvider = { + name: "budgeted", + compress: vi.fn(), + summarize: vi.fn(async () => { + throw new SessionTokenBudgetExceededError("ses_budget", 10, 10, 0, "summarize"); + }), + }; + const { handler, kv } = await setupHandler({ + sessionId: "ses_budget", + obsCount: 3, + provider, + }); + + const result = await handler({ sessionId: "ses_budget" }); + + expect(result).toMatchObject({ + success: true, + fallback: "session_token_budget_exhausted", + summary: expect.objectContaining({ + sessionId: "ses_budget", + narrative: expect.stringContaining("token budget"), + }), + }); + expect(await kv.get("summaries", "ses_budget")).toMatchObject({ + sessionId: "ses_budget", + }); +}); + +it("passes session context to chunk and reduce provider calls", async () => { + process.env.SUMMARIZE_CHUNK_SIZE = "1"; + const provider = makeProvider([ + summaryXml({ title: "one" }), + summaryXml({ title: "two" }), + summaryXml({ title: "merged" }), + ]); + const { handler } = await setupHandler({ sessionId: "ses_chunks", obsCount: 2, provider }); + await handler({ sessionId: "ses_chunks" }); + expect(provider.calls.every((c) => c.context?.sessionId === "ses_chunks")).toBe(true); +}); +``` + +- [ ] **Step 2: Run tests and verify RED** + +Run: + +```bash +corepack pnpm exec vitest run test/summarize.test.ts +``` + +Expected: fail until context and fallback are implemented. + +- [ ] **Step 3: Add deterministic summary helper** + +Add `buildSyntheticSessionSummary(session, compressed)`: + +- title from `session.firstPrompt`, `session.summary`, or `Session summary unavailable: token budget exhausted`. +- narrative must explicitly say LLM summarization was skipped because the session token budget was exhausted. +- files/concepts from unique observation fields, capped. +- decisions from decision observations/facts only; do not invent decisions. + +- [ ] **Step 4: Pass context through summarize calls** + +Pass provider context to: + +- single summary call. +- chunk summary retry calls. +- reduce call. + +Budget errors must not be retried repeatedly like parse failures. + +- [ ] **Step 5: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/summarize.test.ts +``` + +Expected: pass. + +## Task 8: Other Provider Call Sites + +**Files:** + +- Modify: `src/functions/sliding-window.ts` +- Modify: `src/functions/skill-extract.ts` +- Modify: `src/functions/crystallize.ts` +- Modify: `src/functions/graph.ts` +- Modify: `src/functions/temporal-graph.ts` +- Modify: `src/functions/consolidate.ts` +- Modify: `src/functions/consolidation-pipeline.ts` +- Modify: `src/functions/query-expansion.ts` +- Modify: `src/functions/compress-file.ts` +- Modify: `src/functions/flow-compress.ts` +- Modify: `src/functions/reflect.ts` +- Existing tests in touched areas. + +- [ ] **Step 1: Write/adjust focused context tests where existing harnesses are cheap** + +Add assertions in existing tests that known session call sites pass context: + +- `test/sliding-window.test.ts`: provider call context includes `sessionId`. +- `test/skill-extract.test.ts`: provider call context includes `sessionId`. +- `test/crystallize.test.ts`: when payload has `sessionId`, provider context includes it. +- `test/temporal-graph.test.ts` or graph tests: single-session observations pass context; mixed sessions omit context. + +- [ ] **Step 2: Run tests and verify RED** + +Run the touched focused tests: + +```bash +corepack pnpm exec vitest run test/sliding-window.test.ts test/skill-extract.test.ts test/crystallize.test.ts test/temporal-graph.test.ts test/graph.test.ts +``` + +Expected: fail where context is not yet passed. + +- [ ] **Step 3: Pass context where session is explicit** + +Apply the arena synthesis rule: + +- explicit single session: pass `sessionId`, `sourceFunction`, and `purpose`. +- mixed/background/no-session: omit context and rely on system sentinel. + +- [ ] **Step 4: Run tests and verify GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/sliding-window.test.ts test/skill-extract.test.ts test/crystallize.test.ts test/temporal-graph.test.ts test/graph.test.ts +``` + +Expected: pass. + +## Task 9: Docs And Examples + +**Files:** + +- Modify: `.env.example` +- Modify: `README.md` + +- [ ] **Step 1: Add docs checks** + +Search target areas: + +```bash +rg -n "AGENTMEMORY_OUTPUT_LANG|TOKEN_BUDGET|AGENTMEMORY_AUTO_COMPRESS|configuration" README.md .env.example +``` + +- [ ] **Step 2: Document config** + +In `.env.example`, near provider/generated text settings: + +```sh +# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Estimated LLM tokens per session before hard block. Set 0 to disable. +``` + +In README config section, add: + +```md +Set `AGENTMEMORY_SESSION_TOKEN_CAP` to cap estimated LLM tokens used by background session compression/summarization. The default is `100000`; set it to `0` to disable enforcement. Session-start clients may provide a positive `sessionTokenCap` override for one session. `agentmemory status` reports active, near-cap, and exhausted budget counts. +``` + +Clarify this is separate from `TOKEN_BUDGET`, which controls context injection size. + +- [ ] **Step 3: Verify docs references** + +Run: + +```bash +rg -n "AGENTMEMORY_SESSION_TOKEN_CAP|sessionTokenCap|TOKEN_BUDGET" README.md .env.example +``` + +Expected: new references present and no claim of exact billing-grade token usage. + +## Task 10: Focused Simplification And Verification + +**Files:** + +- All touched source/tests/docs. +- Task record: `docs/todos/2026-06-19-issue-311-session-token-budget/todo.md` + +- [ ] **Step 1: Focused simplification pass** + +Review touched source for: + +- duplicate token-estimation helpers. +- direct KV access from provider code. +- broad API surface additions. +- prompt/response logging. +- budget block counted as circuit-breaker failure. +- fallback summaries that imply LLM generation. + +- [ ] **Step 2: Run targeted tests** + +Run: + +```bash +corepack pnpm exec vitest run test/session-budget.test.ts test/resilient-provider.test.ts test/api-session-start.test.ts test/compress-budget.test.ts test/summarize.test.ts test/session-end-triggers-graph.test.ts test/api-boundary-coverage.test.ts +``` + +Expected: all targeted tests pass. + +- [ ] **Step 3: Run project-native gates** + +Run: + +```bash +corepack pnpm test +corepack pnpm run lint +corepack pnpm run build +``` + +Expected: all pass. If pnpm ignored-build hardening blocks a command, run `corepack pnpm install --frozen-lockfile --ignore-scripts` per AGENTS.md, then retry. + +- [ ] **Step 4: Run security gates** + +Run: + +```bash +semgrep scan --config p/default --error --metrics=off . +``` + +Expected: 0 blocking findings. + +After staging only task-owned files: + +```bash +gitleaks protect --staged --redact +``` + +Expected: no leaks. + +OSV is not required unless dependency, lockfile, container, vendored, or third-party surfaces change. + +- [ ] **Step 5: Update task record** + +Update `todo.md` with: + +- implementation summary. +- verification evidence. +- security gate evidence. +- residual risks. +- Sprint Contract and Feature / Verification Matrix status. + +## Task 11: GitHub Push Prep + +**Files:** + +- Task-owned source/test/doc files. +- Task record and plan. + +- [ ] **Step 1: Invoke `$github-push-prepare`** + +Pass: + +- task record path: `docs/todos/2026-06-19-issue-311-session-token-budget/todo.md` +- plan path: `docs/todos/2026-06-19-issue-311-session-token-budget/plan.md` +- security-sensitive surfaces: persistence/schema, API boundary, provider runtime, config, audit, telemetry/status. +- whether fetch/push/PR creation has explicit current-turn approval. + +- [ ] **Step 2: Respect remote-write gates** + +Do not fetch, push, create PR, merge PR, close issue, or archive thread unless current-turn authorization covers that exact remote/project state change or a narrower tool prompt is approved. + +## Self-Review Notes + +- Spec coverage: all issue acceptance items are mapped to tasks except OTEL histogram naming, which is covered under Task 2/3/telemetry only if implementation needs explicit metrics beyond registered function telemetry. +- Placeholder scan: no unresolved placeholder markers are intentionally left in the plan. +- Type consistency: use `SessionBudget`, `SessionBudgetReservation`, `ProviderCallContext`, `SessionTokenBudgetExceededError`, `SYSTEM_SESSION_BUDGET_ID`, `sessionTokenCap`, and `AGENTMEMORY_SESSION_TOKEN_CAP` consistently. +- Scope control: no MCP tools and no new REST endpoint in first pass; budget counts ride on health. diff --git a/docs/todos/2026-06-19-issue-311-session-token-budget/todo.md b/docs/todos/2026-06-19-issue-311-session-token-budget/todo.md new file mode 100644 index 00000000..800e879e --- /dev/null +++ b/docs/todos/2026-06-19-issue-311-session-token-budget/todo.md @@ -0,0 +1,154 @@ +# Issue 311 Session Token Budget + +Task id: `2026-06-19-issue-311-session-token-budget` + +## Scope + +- Repository: `/Users/A1538552/.codex/worktrees/5887/agentmemory` +- Branch: `issue/311-session-token-budget` +- Issue: GitHub issue #311, imported from upstream issue #767 +- Target remote: `origin` at `https://github.com/wbugitlab1/agentmemory.git` +- Parent batch record: `/Users/A1538552/_projects/_tools/agentmemory/docs/todos/2026-06-19-issue-triage-batch-288-312/todo.md` + +## Sprint Contract + +- Goal: add a per-session LLM token budget with a soft warning threshold and hard block for future LLM calls after exhaustion. +- Scope: + - session-budget state and types under iii-engine KV. + - session budget init/record/read/reap functions. + - session id propagation for LLM calls. + - provider-level budget gate for `compress()` and `summarize()` paths. + - session start override and global `AGENTMEMORY_SESSION_TOKEN_CAP` default. + - status visibility for active, near-cap, and exhausted sessions. + - focused tests for budget init, recording, soft warn, hard cap, concurrent recording, system sentinel behavior, and status counts. +- Non-goals: + - no dependency additions. + - no direct SQLite bypass or non-iii state backend. + - no force-push, rebase, or `upstream` writes. + - no exact provider billing reconciliation unless the provider interface is explicitly approved to expose token usage. +- Acceptance criteria: + - issue validity is checked against current fork evidence. + - any new KV scope is added to `src/state/schema.ts` and typed in `src/types.ts`. + - all state-changing functions validate input and use existing audit/state patterns. + - soft warning fires once when budget usage reaches 80 percent. + - hard cap blocks future LLM calls for the same session and returns synthetic/no-LLM output where the calling layer can do so. + - session forks get fresh budget rows because budgets are keyed by session id. + - system-triggered/no-session calls use a sentinel budget scope. + - `agentmemory status` shows active, near-cap, and exhausted budget counts. + - tests prove the changed behavior before implementation claims. +- Intended verification: + - targeted red/green vitest coverage for the budget module, provider gate, session start API, summarize fallback, and CLI status text. + - `corepack pnpm test`. + - `corepack pnpm run lint`. + - `corepack pnpm run build`. + - Semgrep for code/config/persistence/API changes. + - staged Gitleaks before any commit. +- Known boundaries: + - The requested feature introduces persisted budget state and may add externally consumed API/function surface. + - The current `MemoryProvider` interface returns strings only; exact token accounting is unavailable without an interface expansion. Estimated accounting from prompt/response text stays smaller but is less precise. + - The delegation requires a Human Checkpoint for API, persistence, schema, dependency, architecture, or remote/project-policy changes. This issue inherently touches persistence/schema and may touch API/interface surface. +- Stop conditions: + - Human Checkpoint is not granted for the persistence/schema/API/interface boundary. + - local evidence shows the feature is already implemented in this fork. + - arena candidates diverge on incompatible architecture choices. + - verification or required security gates produce unresolved blocking findings. + +## Issue Validity + +- Issue #311 is open in `wbugitlab1/agentmemory`. +- Local PR search found no PR in `wbugitlab1/agentmemory` matching issue #311 or the session token budget feature. +- Local code search found no `AGENTMEMORY_SESSION_TOKEN_CAP`, session-budget KV scope, session-budget functions, provider budget gate, or status budget counts. +- Existing code has unrelated token-budget support for search/context packing and query execution, but not per-session LLM-call budgeting. +- Conclusion: valid implementation issue for this fork, subject to the Human Checkpoint above. + +## Feature / Verification Matrix + +| Change | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Confirm branch/worktree context | Local git commands | Done | Worktree `/Users/A1538552/.codex/worktrees/5887/agentmemory` was detached at `499b53fc`; branch `issue/311-session-token-budget` created from `origin/main` at the same commit. | +| Validate issue legitimacy | GitHub issue/PR read and local code search | Done | `gh issue view 311`, `gh pr list --search ...`, and `rg` for budget/session terms inspected. | +| Design implementation shape with arena | Three read-only candidates plus judge | Done | Candidate 2 selected as base; Candidate 3 reservation/accounting tests and Candidate 1 circuit-breaker/source-context details grafted. Synthesis: `docs/todos/2026-06-19-issue-311-session-token-budget/arena-synthesis.md`; judge: `/tmp/arena-issue-311-session-token-budget/judge.md`. | +| Persistence/API checkpoint | Human confirmation | Done | User replied `weiter` after the checkpoint; proceed within issue #311 acceptance boundaries. | +| Implementation plan | `github-feature-loop` / `writing-plans` | Done | Plan saved to `docs/todos/2026-06-19-issue-311-session-token-budget/plan.md`; placeholder scan passed. | +| Pre-implementation plan review | Three read-only reviewers | Done | Accepted findings: use ALS, exact event/OTEL acceptance, implicit-session init, chunked summarize fallback, behavior-level CLI/API tests. | +| Implement session budget | Red/green vitest | Done | Added session-budget KV/types/functions, ALS propagation, provider guard, health/status visibility, docs, and focused tests. Targeted budget suite passed: 9 files, 52 tests. | +| Final verification and GitHub flow | Project-native tests, security gates, push/PR/merge | Verified locally | `lint`, `build`, `skills:check`, plugin surface contract, focused budget tests, focused flaky-test repros, Semgrep, serial full-suite Vitest, and fresh standard `corepack pnpm test` passed. After merging `origin/main`, `lint`, `build`, `skills:check`, and standard `corepack pnpm test` passed again. | + +## Arena Ledger + +| Workstream | Scope | Edits allowed | Expected output | Result | Residual risk | +| --- | --- | --- | --- | --- | --- | +| Candidate 1 | Read-only architecture design for issue #311 | No production edits; may write `/tmp/arena-issue-311-session-token-budget/candidate-1/solution.md` | Implementation shape and rationale | Done | Strong support for circuit-breaker-before-budget ordering and `sourceFunction` context; broader provider construction and `describeImage` scope rejected for first pass. | +| Candidate 2 | Read-only architecture design for issue #311 | No production edits; may write `/tmp/arena-issue-311-session-token-budget/candidate-2/solution.md` | Implementation shape and rationale | Done | Selected as base; best iii/StateKV fit through registered budget functions and `sdk.trigger()` guard with minimal REST/MCP surface. | +| Candidate 3 | Read-only architecture design for issue #311 | No production edits; may write `/tmp/arena-issue-311-session-token-budget/candidate-3/solution.md` | Implementation shape and rationale | Done | Graft reservation/failure-settlement behavior, stale-reservation cleanup, typed hard-block error, and concurrency tests; reject reserving before circuit-breaker check. | +| Cross-judge | Read-only rubric scoring | No edits | Scores, base recommendation, and risks | Done | Judge scored Candidate 2 24/25, Candidate 3 22/25, Candidate 1 20/25 and recommended the same grafts recorded in `arena-synthesis.md`. | + +## Progress + +- 2026-06-19: Read `AGENTS.md`, `using-superpowers`, `arena`, `github-feature-loop`, `brainstorming`, `test-driven-development`, `writing-plans`, `review-and-implement`, and `verification-before-completion`. +- 2026-06-19: Confirmed worktree path, remotes, and worktree list. `origin` is `https://github.com/wbugitlab1/agentmemory.git`; `upstream` is present but out of scope. +- 2026-06-19: `git status -sb --untracked-files=all` showed detached clean `HEAD`; `HEAD` and `origin/main` both resolved to `499b53fc4a0f58d6f7b2daf674a7943de023d75a`. +- 2026-06-19: No local or remote-tracking `issue/311-session-token-budget` ref existed. Created and switched to `issue/311-session-token-budget`. +- 2026-06-19: Read issue #311 via `gh issue view`; issue is open and requests per-session token caps with soft warn and hard block. +- 2026-06-19: Local PR search for #311/session token budget returned no matching PR in `wbugitlab1/agentmemory`. +- 2026-06-19: Local code search found no existing session-budget implementation. Existing token-budget hits are unrelated context/search/query budgets. +- 2026-06-19: Read parent batch record from the parent checkout; it confirms #311 was assigned to this branch/worktree and target `origin/main`. +- 2026-06-19: Inspected provider/session/status surfaces: `src/providers/resilient.ts`, `src/functions/observe.ts`, `src/functions/compress.ts`, `src/functions/summarize.ts`, `src/triggers/api.ts`, `src/triggers/events.ts`, `src/index.ts`, `src/state/schema.ts`, `src/types.ts`, and status tests. +- 2026-06-19: Ran required arena as read-only architecture design. Three candidate artifacts and one cross-judge artifact were written under `/tmp/arena-issue-311-session-token-budget/`. No production implementation files were edited by candidates. +- 2026-06-19: Arena synthesis saved to `docs/todos/2026-06-19-issue-311-session-token-budget/arena-synthesis.md`. Selected Candidate 2 as base; grafted Candidate 3 reservation/accounting tests and Candidate 1 circuit-breaker/source-context details. +- 2026-06-19: Arena verification confirmed all candidate and judge artifacts exist. `git status -sb --untracked-files=all` shows only task-state files under `docs/todos/2026-06-19-issue-311-session-token-budget/`. +- 2026-06-19: Stopped before production implementation for Human Checkpoint on persistence/schema/API/provider-interface behavior. +- 2026-06-19: User invoked `$github-feature-loop`. Read the skill plus `writing-plans`, `review-and-implement`, and `verification-before-completion`; rechecked `AGENTS.md` and `git status`. +- 2026-06-19: Wrote implementation plan to `docs/todos/2026-06-19-issue-311-session-token-budget/plan.md`. The plan carries forward the latest default policy recommendation: unset/blank `AGENTMEMORY_SESSION_TOKEN_CAP` defaults to `100000`, and `0` disables enforcement. +- 2026-06-19: Plan self-review placeholder scan passed with `rg -n "TBD|TODO|implement later|fill in details|Add appropriate|Write tests for the above|Similar to" docs/todos/2026-06-19-issue-311-session-token-budget/plan.md` returning no matches. +- 2026-06-19: User replied `weiter`, treated as the checkpoint approval to proceed with persistence/schema/API/provider-guard implementation inside issue scope. +- 2026-06-19: Ran three read-only pre-implementation reviewers. Accepted high/medium findings: issue requires `AsyncLocalStorage` session propagation, `event::mem::budget::soft-warned`, `event::mem::budget::exhausted`, OTEL histogram `agentmemory.session.tokens_used`, implicit-session budget initialization, chunked summarize budget fallback coverage, API session-start harness updates, and behavior-level CLI status coverage. +- 2026-06-19: Updated `plan.md` with accepted review corrections. The corrected architecture uses exact KV scope `mem:session-budget`, ALS context, budget init/record functions, budget events, OTEL histogram, and deterministic no-LLM fallback with `truncated: true` summaries on budget exhaustion. +- 2026-06-19: Implemented session token budget support: + - Added `src/session-context.ts` for AsyncLocalStorage session propagation and a system sentinel. + - Added `src/functions/session-budget.ts` with `mem::session::budget::init`, `mem::session::budget::record`, an internal timer-backed reap function, soft/hard budget events, audit records, and `agentmemory.session.tokens_used`. + - Wired provider-level budget checks into `ResilientProvider` for `compress` and `summarize`, including 0/0 failed-call accounting. + - Initialized budgets from REST and durable session-start paths plus implicit observe-created sessions. + - Added deterministic no-LLM fallbacks for compression and summaries when the hard cap is exhausted; summaries persist `truncated: true`. + - Exposed active/near-cap/exhausted counts through health and `agentmemory status`. + - Documented `AGENTMEMORY_SESSION_TOKEN_CAP`, default `100000`, disable value `0`, and per-session `sessionTokenCap`. +- 2026-06-19: Targeted verification passed: + - `corepack pnpm exec vitest run test/session-context.test.ts test/session-budget.test.ts test/telemetry-session-budget.test.ts test/resilient-provider.test.ts test/api-session-start.test.ts test/observe-implicit-session.test.ts test/summarize.test.ts test/compress-budget.test.ts test/cli-status-budget.test.ts` passed after implementation: 9 files, 52 tests. + - `corepack pnpm run lint` passed. + - `corepack pnpm run build` passed with existing tsdown warning noise only. + - `corepack pnpm run skills:gen` regenerated `plugin/skills/agentmemory-config/REFERENCE.md` after the new env var doc change. + - `corepack pnpm exec vitest run test/plugin-surface-contract.test.ts` passed. + - `corepack pnpm run skills:check` passed. + - `semgrep scan --config p/default --error --metrics=off .` passed with 0 findings. + - `git diff --check` passed. + - Changed full-suite approach `corepack pnpm exec vitest run --exclude test/integration.test.ts --no-file-parallelism --maxWorkers=1` passed: 212 test files, 2851 tests. +- 2026-06-20: User requested repeating verification until the standard full suite is green. Fresh `corepack pnpm test` passed with 212 test files and 2851 tests. `git diff --check` also passed after the run. +- Prior full-suite failures resolved before remote flow: + - First post-implementation `corepack pnpm test` failed only because `test/plugin-surface-contract.test.ts` detected the generated skill reference was stale; this was fixed with `skills:gen` and a passing focused plugin-surface run. + - Next `corepack pnpm test` failed under load in `test/codex-sdk-provider.test.ts` with a 2000 ms timeout. Focused repro `corepack pnpm exec vitest run test/codex-sdk-provider.test.ts` passed 3/3. + - Next `corepack pnpm test` failed under load in `test/codex-sdk-provider.test.ts` and `test/hook-source-smoke.test.ts` with timeouts. Focused repro `corepack pnpm exec vitest run test/hook-source-smoke.test.ts` passed 23/23. + - 2026-06-20 fresh standard full-suite retry passed, resolving the verification caveat. +- 2026-06-20: User requested push, PR, and merge. Fetched `origin`; `origin/main` resolved to `682a133e66fa1650d62ae460d089b8ec19aaa92b`. Merged `origin/main` into `issue/311-session-token-budget`, resolved conflicts by preserving origin-main failure-audit/session-reaper behavior and Issue 311 session-budget behavior. Post-merge checks passed: + - `corepack pnpm run lint` + - `corepack pnpm run build` + - `corepack pnpm run skills:check` + - `corepack pnpm test` passed with 216 test files and 2950 tests. +- 2026-06-20: Main advanced while PR #1028 CI was running. Merged `origin/main` at `be1b0094ff417d75b1bc6a32be3ae38c0ebca200`; checks passed: + - `corepack pnpm run lint` + - `corepack pnpm run build` + - `corepack pnpm run skills:check` + - `corepack pnpm test` passed with 217 test files and 2968 tests. + - `semgrep scan --config p/default --error --metrics=off .` passed with 0 findings. + - `gitleaks detect --source . --redact --log-opts 4b6ae62a417ba3ef6037baf63a04c1a7b327c7c7..HEAD` passed with no leaks. +- 2026-06-20: Main advanced again to `f497290b131eeb755a6cea90bc47293c1a987eac`. Merged latest `origin/main`; checks passed: + - `corepack pnpm run lint` + - `corepack pnpm run build` + - `corepack pnpm run skills:check` + - `corepack pnpm test` passed with 218 test files and 2986 tests. + - `semgrep scan --config p/default --error --metrics=off .` passed with 0 findings. + - `gitleaks detect --source . --redact --log-opts b3fa65cf06ad2228a3217bac15faab9ff9168522..HEAD` passed with no leaks. + - Full-history `gitleaks detect --source . --redact` reported 14 historical leaks across 1043 commits; range scans for the new merge commits found no new leaks. + +## Current Checkpoint + +Implementation is complete locally, merged with latest observed `origin/main`, and final local verification is green. Next step: push updated branch, wait for PR CI, then clean PR merge. diff --git a/plugin/skills/agentmemory-config/REFERENCE.md b/plugin/skills/agentmemory-config/REFERENCE.md index 22659a27..24c010b7 100644 --- a/plugin/skills/agentmemory-config/REFERENCE.md +++ b/plugin/skills/agentmemory-config/REFERENCE.md @@ -3,7 +3,7 @@ Generated by scanning `src/` for `AGENTMEMORY_*` usage. Do not edit the block below by hand; run `corepack pnpm run skills:gen` after adding or removing a variable. Internal markers ending in two underscores are excluded. -Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 80 recognized variables: +Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 81 recognized variables: - `AGENTMEMORY_AGENT_ID` - `AGENTMEMORY_AGENT_SCOPE` @@ -74,6 +74,7 @@ Configuration is read from the environment and from `~/.agentmemory/.env` (no `e - `AGENTMEMORY_SDK_CHILD` - `AGENTMEMORY_SECRET` - `AGENTMEMORY_SESSION_ID` +- `AGENTMEMORY_SESSION_TOKEN_CAP` - `AGENTMEMORY_SHUTDOWN_TIMEOUT_MS` - `AGENTMEMORY_SLOTS` - `AGENTMEMORY_SUPPRESS_COST_WARNING` diff --git a/src/cli.ts b/src/cli.ts index 2dffc9fb..40310ff7 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -43,6 +43,7 @@ import { renderSplash } from "./cli/splash.js"; import { isFirstRun, readPrefs, resetPrefs, writePrefs } from "./cli/preferences.js"; import { runOnboarding } from "./cli/onboarding.js"; import { buildReadyWebSocketUrls } from "./cli/ready-hint.js"; +import { renderSessionBudgetStatus } from "./cli/status-render.js"; import { buildImportJsonlRequestHeaders, buildJsonRequestHeaders, @@ -1630,9 +1631,12 @@ async function runStatus() { p.log.success(`Connected — v${version} at ${base}`); + const budgetStatusLine = renderSessionBudgetStatus( + healthRes?.sessionBudgets, + ); const lines = [ `Health: ${status === "healthy" ? "✓ healthy" : status}`, - `Sessions: ${sessions}`, + budgetStatusLine ?? `Sessions: ${sessions}`, `Observations: ${obsCount}`, `Memories: ${memCount}`, `Graph: ${nodes} nodes, ${edges} edges`, diff --git a/src/cli/status-render.ts b/src/cli/status-render.ts new file mode 100644 index 00000000..c57cbcfc --- /dev/null +++ b/src/cli/status-render.ts @@ -0,0 +1,8 @@ +import type { SessionBudgetSummary } from "../types.js"; + +export function renderSessionBudgetStatus( + summary: SessionBudgetSummary | null | undefined, +): string | null { + if (!summary?.enabled) return null; + return `Sessions: ${summary.active} active, ${summary.nearCap} near-cap, ${summary.exhausted} exhausted`; +} diff --git a/src/config.ts b/src/config.ts index ba305de1..2e4adb8f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -27,6 +27,7 @@ function safeParseInt(value: string | undefined, fallback: number): number { } const MAX_TCP_PORT = 65535; +export const DEFAULT_SESSION_TOKEN_CAP = 100_000; function parsePort(value: string | undefined): number | null { if (!value) return null; @@ -41,6 +42,20 @@ function parseStrictPort(value: string | undefined): number | null { return parsePort(value); } +export function getSessionTokenCap( + env: Record = getMergedEnv(), +): number { + const raw = env["AGENTMEMORY_SESSION_TOKEN_CAP"]; + if (raw === undefined || raw.trim() === "") return DEFAULT_SESSION_TOKEN_CAP; + const trimmed = raw.trim(); + if (trimmed === "0") return 0; + if (!/^\d+$/.test(trimmed)) return DEFAULT_SESSION_TOKEN_CAP; + const parsed = Number(trimmed); + return Number.isSafeInteger(parsed) && parsed > 0 + ? parsed + : DEFAULT_SESSION_TOKEN_CAP; +} + function parseRestAnchor(value: string | undefined, fallback: number): number { const parsed = parsePort(value); return parsed !== null && parsed + 2 <= MAX_TCP_PORT ? parsed : fallback; @@ -386,6 +401,7 @@ export function loadConfig(): AgentMemoryConfig { viewerPort, provider, tokenBudget: safeParseInt(env["TOKEN_BUDGET"], 2000), + sessionTokenCap: getSessionTokenCap(env), maxObservationsPerSession: safeParseInt(env["MAX_OBS_PER_SESSION"], 500), compressionModel: provider.compressModel ?? provider.model, dataDir: DATA_DIR, diff --git a/src/functions/compress.ts b/src/functions/compress.ts index 2b1a02cc..0ba7c639 100644 --- a/src/functions/compress.ts +++ b/src/functions/compress.ts @@ -16,10 +16,13 @@ import { import { VISION_DESCRIPTION_PROMPT } from "../prompts/vision.js"; import { getXmlTag, getXmlChildren } from "../prompts/xml.js"; import { getSearchIndex, vectorIndexAddGuarded } from "./search.js"; +import { buildSyntheticCompression } from "./compress-synthetic.js"; import { CompressOutputSchema } from "../eval/schemas.js"; import { validateOutput } from "../eval/validator.js"; import { scoreCompression } from "../eval/quality.js"; import { compressWithRetry } from "../eval/self-correct.js"; +import { runWithSessionContext } from "../session-context.js"; +import { isSessionTokenBudgetExceededError } from "./session-budget.js"; import type { MetricsStore } from "../eval/metrics-store.js"; import { auditLlmFailure, isAuditRetryPayload } from "./audit-failures.js"; import { logger } from "../logger.js"; @@ -71,13 +74,20 @@ export function registerCompressFunction( provider: MemoryProvider, metricsStore?: MetricsStore, ): void { - sdk.registerFunction("mem::compress", + sdk.registerFunction( + "mem::compress", async (data: { observationId: string; sessionId: string; raw: RawObservation; __agentmemoryAuditRetry?: boolean; - }) => { + }) => + runWithSessionContext( + { + sessionId: data?.sessionId, + sourceFunction: "mem::compress", + }, + async () => { const startMs = Date.now(); let imageDescription: string | undefined; @@ -265,6 +275,84 @@ export function registerCompressFunction( return { success: true, compressed, qualityScore }; } catch (err) { + if (isSessionTokenBudgetExceededError(err)) { + const compressed: CompressedObservation = { + ...buildSyntheticCompression({ + ...data.raw, + id: data.observationId, + sessionId: data.sessionId, + }), + id: data.observationId, + sessionId: data.sessionId, + timestamp: data.raw.timestamp, + metadata: { + budgetExhausted: true, + budgetError: err.message, + }, + }; + await kv.set( + KV.observations(data.sessionId), + data.observationId, + compressed, + ); + try { + getSearchIndex().add(compressed); + } catch (indexErr) { + logger.warn("Failed to index synthetic budget fallback", { + obsId: compressed.id, + sessionId: compressed.sessionId, + error: + indexErr instanceof Error ? indexErr.message : String(indexErr), + }); + } + await vectorIndexAddGuarded( + compressed.id, + compressed.sessionId, + compressed.title + " " + (compressed.narrative || ""), + { kind: "synthetic", logId: compressed.id }, + ); + await Promise.allSettled([ + sdk.trigger({ + function_id: "stream::set", + payload: { + stream_name: STREAM.name, + group_id: STREAM.group(data.sessionId), + item_id: data.observationId, + data: { type: "compressed", observation: compressed }, + }, + }), + sdk.trigger({ + function_id: "stream::send", + payload: { + stream_name: STREAM.name, + group_id: STREAM.viewerGroup, + id: `compressed-${data.observationId}`, + type: "compressed_observation", + data: { + type: "compressed", + observation: compressed, + sessionId: data.sessionId, + }, + }, + action: TriggerAction.Void(), + }), + ]); + const latencyMs = Date.now() - startMs; + if (metricsStore) { + await metricsStore.record("mem::compress", latencyMs, true, 30); + } + logger.warn("Compression budget exhausted; stored synthetic fallback", { + obsId: data.observationId, + sessionId: data.sessionId, + }); + return { + success: true, + compressed, + qualityScore: 30, + synthetic: true, + reason: "session_budget_exhausted", + }; + } const msg = err instanceof Error ? err.message : String(err); const latencyMs = Date.now() - startMs; if (metricsStore) { @@ -288,6 +376,7 @@ export function registerCompressFunction( } return { success: false, error: "compression_failed" }; } - }, + }, + ), ); } diff --git a/src/functions/consolidation-pipeline.ts b/src/functions/consolidation-pipeline.ts index 0e90ca56..397618d0 100644 --- a/src/functions/consolidation-pipeline.ts +++ b/src/functions/consolidation-pipeline.ts @@ -17,6 +17,7 @@ import { import { recordAudit } from "./audit.js"; import { auditLlmFailure, isAuditRetryPayload } from "./audit-failures.js"; import { getConsolidationDecayDays, isConsolidationEnabled } from "../config.js"; +import { runWithSessionContext } from "../session-context.js"; import { logger } from "../logger.js"; const KV_BATCH_SIZE = 16; @@ -65,8 +66,20 @@ export function registerConsolidationPipelineFunction( kv: StateKV, provider: MemoryProvider, ): void { - sdk.registerFunction("mem::consolidate-pipeline", - async (data?: { tier?: string; force?: boolean; project?: string; __agentmemoryAuditRetry?: boolean }) => { + sdk.registerFunction( + "mem::consolidate-pipeline", + async (data?: { + tier?: string; + force?: boolean; + project?: string; + __agentmemoryAuditRetry?: boolean; + }) => + runWithSessionContext( + { + project: data?.project, + sourceFunction: "mem::consolidate-pipeline", + }, + async () => { const force = data?.force === true; if (!force && !isConsolidationEnabled()) { return { success: false, skipped: true, reason: "Consolidation disabled: set CONSOLIDATION_ENABLED=true or configure an LLM provider (ANTHROPIC_API_KEY / AZURE_OPENAI_API_KEY + AZURE_OPENAI_ENDPOINT + AZURE_OPENAI_DEPLOYMENT / OPENAI_API_KEY / OPENROUTER_API_KEY / GEMINI_API_KEY / GOOGLE_API_KEY / MINIMAX_API_KEY / KIMI_FOR_CODING_API_KEY / DEEPSEEK_API_KEY / ASTRAFLOW_API_KEY / ASTRAFLOW_CN_API_KEY / OPENAI_BASE_URL / AGENTMEMORY_PROVIDER=agent-sdk)" }; @@ -354,7 +367,8 @@ export function registerConsolidationPipelineFunction( logger.info("Consolidation pipeline complete", { tier, results }); return { success: true, results }; - }, + }, + ), ); } diff --git a/src/functions/observe.ts b/src/functions/observe.ts index 72bd9505..2952fdde 100644 --- a/src/functions/observe.ts +++ b/src/functions/observe.ts @@ -9,6 +9,7 @@ import { isAutoCompressEnabled } from "../config.js"; import { buildSyntheticCompression } from "./compress-synthetic.js"; import { getSearchIndex, vectorIndexAddGuarded } from "./search.js"; import { getAgentId } from "../config.js"; +import { runWithSessionContext } from "../session-context.js"; import { logger } from "../logger.js"; export function extractImage(d: unknown): string | undefined { @@ -41,7 +42,14 @@ export function registerObserveFunction( maxObservationsPerSession?: number, ): void { sdk.registerFunction("mem::observe", - async (payload: HookPayload) => { + async (payload: HookPayload) => + runWithSessionContext( + { + sessionId: payload?.sessionId, + project: payload?.project, + sourceFunction: "mem::observe", + }, + async () => { if ( !payload?.sessionId || @@ -277,6 +285,17 @@ export function registerObserveFunction( ? { firstPrompt: trimmedPrompt } : {}), }); + try { + await sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { sessionId: payload.sessionId, source: "implicit" }, + }); + } catch (err) { + logger.warn("session budget init failed for implicit session", { + sessionId: payload.sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } } // Per-observation LLM compression is opt-in as of 0.8.8 (#138). @@ -340,5 +359,6 @@ export function registerObserveFunction( return { observationId: obsId }; }); }, + ), ); } diff --git a/src/functions/session-budget.ts b/src/functions/session-budget.ts new file mode 100644 index 00000000..c58ad3f9 --- /dev/null +++ b/src/functions/session-budget.ts @@ -0,0 +1,457 @@ +import { TriggerAction, type ISdk } from "iii-sdk"; +import { getSessionTokenCap } from "../config.js"; +import { KV } from "../state/schema.js"; +import type { StateKV } from "../state/kv.js"; +import { withKeyedLock } from "../state/keyed-mutex.js"; +import type { + Session, + SessionBudget, + SessionBudgetOperation, + SessionBudgetSource, + SessionBudgetSummary, +} from "../types.js"; +import { + SYSTEM_SESSION_BUDGET_ID, + getSessionContext, +} from "../session-context.js"; +import { safeAudit } from "./audit.js"; +import { getHistograms } from "../telemetry/setup.js"; +import { logger } from "../logger.js"; + +export { SYSTEM_SESSION_BUDGET_ID } from "../session-context.js"; + +const SOFT_WARN_RATIO = 0.8; +const DEFAULT_REAP_RETENTION_DAYS = 30; +const DAY_MS = 24 * 60 * 60 * 1000; + +export class SessionTokenBudgetExceededError extends Error { + constructor( + readonly sessionId: string, + readonly tokensUsed: number, + readonly tokenCap: number, + ) { + super( + `session token budget exhausted for ${sessionId}: ${tokensUsed}/${tokenCap}`, + ); + this.name = "SessionTokenBudgetExceededError"; + } +} + +export function isSessionTokenBudgetExceededError( + err: unknown, +): err is SessionTokenBudgetExceededError { + return err instanceof SessionTokenBudgetExceededError; +} + +export function estimateTokens(...parts: Array): number { + const chars = parts.reduce((sum, part) => sum + (part?.length ?? 0), 0); + return Math.max(1, Math.ceil(chars / 3)); +} + +export interface SessionBudgetCheck { + sessionId: string; + operation: SessionBudgetOperation; + providerName?: string; +} + +export interface SessionBudgetRecord extends SessionBudgetCheck { + inputTokens: number; + outputTokens: number; + failed: boolean; +} + +export interface SessionBudgetGuard { + check(call: SessionBudgetCheck): Promise; + record(call: SessionBudgetRecord): Promise; +} + +let budgetGuard: SessionBudgetGuard | null = null; + +export function setSessionBudgetGuard(guard: SessionBudgetGuard | null): void { + budgetGuard = guard; +} + +export function getSessionBudgetGuard(): SessionBudgetGuard | null { + return budgetGuard; +} + +function normalizeSessionId(sessionId: unknown): string { + const trimmed = typeof sessionId === "string" ? sessionId.trim() : ""; + return trimmed || SYSTEM_SESSION_BUDGET_ID; +} + +function normalizeSource( + source: unknown, + sessionId: string, + tokenCapProvided: boolean, +): SessionBudgetSource { + if (tokenCapProvided) return "session_override"; + if ( + source === "env" || + source === "session_override" || + source === "implicit" || + source === "system" + ) { + return source; + } + return sessionId === SYSTEM_SESSION_BUDGET_ID ? "system" : "env"; +} + +function normalizeCap(tokenCap: unknown): number | null { + if (tokenCap === undefined) return getSessionTokenCap(); + if ( + typeof tokenCap === "number" && + Number.isSafeInteger(tokenCap) && + tokenCap >= 0 + ) { + return tokenCap; + } + return null; +} + +function statusFor(tokenCap: number, tokensUsed: number): SessionBudget["status"] { + if (tokenCap === 0) return "disabled"; + if (tokensUsed >= tokenCap) return "exhausted"; + if (tokensUsed >= Math.ceil(tokenCap * SOFT_WARN_RATIO)) return "near_cap"; + return "active"; +} + +function createBudget( + sessionId: string, + tokenCap: number, + source: SessionBudgetSource, + now: string, +): SessionBudget { + return { + sessionId, + tokenCap, + tokensUsed: 0, + costEstimate: 0, + status: statusFor(tokenCap, 0), + source, + estimated: true, + createdAt: now, + updatedAt: now, + callCount: 0, + blockedCallCount: 0, + failedCallCount: 0, + }; +} + +function nonNegativeInt(value: unknown): number | null { + return typeof value === "number" && + Number.isSafeInteger(value) && + value >= 0 + ? value + : null; +} + +async function emitBudgetEvent( + sdk: ISdk, + functionId: "event::mem::budget::soft-warned" | "event::mem::budget::exhausted", + budget: SessionBudget, +): Promise { + try { + await sdk.trigger({ + function_id: functionId, + payload: { + sessionId: budget.sessionId, + tokenCap: budget.tokenCap, + tokensUsed: budget.tokensUsed, + status: budget.status, + }, + action: TriggerAction.Void(), + }); + } catch (err) { + logger.warn("session budget event trigger failed", { + functionId, + sessionId: budget.sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } +} + +async function getOrCreateBudget( + kv: StateKV, + sessionId: string, + source: SessionBudgetSource, +): Promise { + const existing = await kv.get(KV.sessionBudget, sessionId); + if (existing) return existing; + return createBudget(sessionId, getSessionTokenCap(), source, new Date().toISOString()); +} + +export async function getSessionBudgetSummary( + kv: StateKV, +): Promise { + const budgets = await kv.list(KV.sessionBudget); + const sessionBudgets = budgets.filter( + (budget) => budget.sessionId !== SYSTEM_SESSION_BUDGET_ID, + ); + return { + enabled: getSessionTokenCap() > 0, + active: sessionBudgets.filter((budget) => budget.status === "active") + .length, + nearCap: sessionBudgets.filter((budget) => budget.status === "near_cap") + .length, + exhausted: sessionBudgets.filter((budget) => budget.status === "exhausted") + .length, + total: sessionBudgets.length, + }; +} + +export function createSessionBudgetGuard(sdk: ISdk): SessionBudgetGuard { + return { + async check(call) { + const context = getSessionContext(); + const sessionId = call.sessionId || context.sessionId; + const result = (await sdk.trigger({ + function_id: "mem::session::budget::record", + payload: { + sessionId, + operation: call.operation, + model: call.providerName, + phase: "check", + }, + })) as { + allowed?: boolean; + budget?: Pick; + }; + if (result?.allowed === false) { + throw new SessionTokenBudgetExceededError( + sessionId, + result.budget?.tokensUsed ?? 0, + result.budget?.tokenCap ?? 0, + ); + } + }, + async record(call) { + await sdk.trigger({ + function_id: "mem::session::budget::record", + payload: { + sessionId: call.sessionId, + inputTokens: call.inputTokens, + outputTokens: call.outputTokens, + model: call.providerName, + operation: call.operation, + failed: call.failed, + }, + }); + }, + }; +} + +export function registerSessionBudgetFunctions(sdk: ISdk, kv: StateKV): void { + sdk.registerFunction( + "mem::session::budget::init", + async (data: { + sessionId?: string; + tokenCap?: number; + source?: SessionBudgetSource; + }) => { + const sessionId = normalizeSessionId(data?.sessionId); + if (sessionId === SYSTEM_SESSION_BUDGET_ID) { + return { success: false, error: "sessionId is required" }; + } + const tokenCap = normalizeCap(data?.tokenCap); + if (tokenCap === null) { + return { success: false, error: "tokenCap must be a non-negative integer" }; + } + const source = normalizeSource( + data?.source, + sessionId, + data?.tokenCap !== undefined, + ); + + const budget = await withKeyedLock(`session-budget:${sessionId}`, async () => { + const now = new Date().toISOString(); + const existing = await kv.get(KV.sessionBudget, sessionId); + const next: SessionBudget = existing + ? { + ...existing, + tokenCap, + source, + status: statusFor(tokenCap, existing.tokensUsed), + updatedAt: now, + } + : createBudget(sessionId, tokenCap, source, now); + await kv.set(KV.sessionBudget, sessionId, next); + return next; + }); + + await safeAudit(kv, "session_budget_init", "mem::session::budget::init", [ + sessionId, + ], { + tokenCap: budget.tokenCap, + source: budget.source, + }); + + return { success: true, budget }; + }, + ); + + sdk.registerFunction( + "mem::session::budget::record", + async (data: { + sessionId?: string; + inputTokens?: number; + outputTokens?: number; + model?: string; + operation?: SessionBudgetOperation; + failed?: boolean; + phase?: "check" | "record"; + }) => { + const sessionId = normalizeSessionId(data?.sessionId); + const operation = + data?.operation === "summarize" ? "summarize" : "compress"; + const phase = data?.phase === "check" ? "check" : "record"; + const source = + sessionId === SYSTEM_SESSION_BUDGET_ID ? "system" : "implicit"; + + if (phase === "check") { + const result = await withKeyedLock(`session-budget:${sessionId}`, async () => { + const budget = await getOrCreateBudget(kv, sessionId, source); + if (budget.tokenCap === 0) { + await kv.set(KV.sessionBudget, sessionId, budget); + return { allowed: true, budget }; + } + if (budget.tokensUsed >= budget.tokenCap || budget.exhaustedAt) { + const now = new Date().toISOString(); + const shouldEmit = !budget.exhaustedAt; + const blocked: SessionBudget = { + ...budget, + status: "exhausted", + exhaustedAt: budget.exhaustedAt ?? now, + lastBlockedAt: now, + blockedCallCount: budget.blockedCallCount + 1, + updatedAt: now, + }; + await kv.set(KV.sessionBudget, sessionId, blocked); + return { allowed: false, budget: blocked, emitExhausted: shouldEmit }; + } + await kv.set(KV.sessionBudget, sessionId, budget); + return { allowed: true, budget }; + }); + if (!result.allowed && result.emitExhausted) { + await emitBudgetEvent( + sdk, + "event::mem::budget::exhausted", + result.budget, + ); + } + return result; + } + + const inputTokens = nonNegativeInt(data?.inputTokens); + const outputTokens = nonNegativeInt(data?.outputTokens); + if (inputTokens === null || outputTokens === null) { + return { + success: false, + error: "inputTokens and outputTokens must be non-negative integers", + }; + } + + const result = await withKeyedLock(`session-budget:${sessionId}`, async () => { + const budget = await getOrCreateBudget(kv, sessionId, source); + const now = new Date().toISOString(); + const delta = inputTokens + outputTokens; + const tokensUsed = budget.tokensUsed + delta; + const warned = + budget.tokenCap > 0 && + !budget.warnEmittedAt && + tokensUsed >= Math.ceil(budget.tokenCap * SOFT_WARN_RATIO); + const exhausted = + budget.tokenCap > 0 && + !budget.exhaustedAt && + tokensUsed >= budget.tokenCap; + const next: SessionBudget = { + ...budget, + tokensUsed, + status: statusFor(budget.tokenCap, tokensUsed), + updatedAt: now, + lastRecordedAt: now, + lastModel: typeof data?.model === "string" ? data.model : undefined, + lastOperation: operation, + callCount: budget.callCount + 1, + failedCallCount: budget.failedCallCount + (data?.failed ? 1 : 0), + ...(warned ? { warnEmittedAt: now } : {}), + ...(exhausted ? { exhaustedAt: now } : {}), + }; + await kv.set(KV.sessionBudget, sessionId, next); + return { success: true, budget: next, warned, exhausted }; + }); + + getHistograms().sessionTokensUsed.record(result.budget.tokensUsed); + await safeAudit(kv, "session_budget_record", "mem::session::budget::record", [ + sessionId, + ], { + inputTokens, + outputTokens, + operation, + failed: data?.failed === true, + }); + if (result.warned) { + await emitBudgetEvent( + sdk, + "event::mem::budget::soft-warned", + result.budget, + ); + } + if (result.exhausted) { + await emitBudgetEvent( + sdk, + "event::mem::budget::exhausted", + result.budget, + ); + } + return result; + }, + ); + + sdk.registerFunction( + "mem::session::budget::reap", + async (data?: { retentionDays?: number }) => { + const retentionDays = + typeof data?.retentionDays === "number" && + Number.isFinite(data.retentionDays) && + data.retentionDays >= 0 + ? data.retentionDays + : DEFAULT_REAP_RETENTION_DAYS; + const cutoff = Date.now() - retentionDays * DAY_MS; + const [budgets, sessions] = await Promise.all([ + kv.list(KV.sessionBudget), + kv.list(KV.sessions), + ]); + const sessionsById = new Map(sessions.map((session) => [session.id, session])); + const reaped: string[] = []; + for (const budget of budgets) { + if (budget.sessionId === SYSTEM_SESSION_BUDGET_ID) continue; + const session = sessionsById.get(budget.sessionId); + if (!session?.endedAt) continue; + const endedAtMs = Date.parse(session.endedAt); + if (Number.isFinite(endedAtMs) && endedAtMs < cutoff) { + await kv.delete(KV.sessionBudget, budget.sessionId); + reaped.push(budget.sessionId); + } + } + if (reaped.length > 0) { + await safeAudit( + kv, + "session_budget_reap", + "mem::session::budget::reap", + reaped, + { retentionDays, reaped: reaped.length }, + ); + } + return { success: true, reaped: reaped.length }; + }, + ); + + sdk.registerTrigger({ + type: "timer", + function_id: "mem::session::budget::reap", + config: { + timer: { durationMs: DAY_MS, repeat: true }, + }, + }); +} diff --git a/src/functions/summarize.ts b/src/functions/summarize.ts index 0cc16b29..1bcbd5fd 100644 --- a/src/functions/summarize.ts +++ b/src/functions/summarize.ts @@ -19,6 +19,8 @@ import { validateOutput } from "../eval/validator.js"; import { scoreSummary } from "../eval/quality.js"; import type { MetricsStore } from "../eval/metrics-store.js"; import { safeAudit } from "./audit.js"; +import { runWithSessionContext } from "../session-context.js"; +import { isSessionTokenBudgetExceededError } from "./session-budget.js"; import { auditLlmFailure, isAuditRetryPayload } from "./audit-failures.js"; import { logger } from "../logger.js"; @@ -101,6 +103,7 @@ async function summarizeChunkWithRetry( attempt, }); } catch (err) { + if (isSessionTokenBudgetExceededError(err)) throw err; logger.warn("Summarize chunk LLM call failed", { sessionId, chunk: `${idx + 1}/${total}`, @@ -248,14 +251,48 @@ function parseSummaryXml( }; } +function buildBudgetTruncatedSummary( + session: Session, + compressed: CompressedObservation[], +): SessionSummary { + const files = new Set(); + const concepts = new Set(); + for (const obs of compressed) { + for (const file of obs.files ?? []) files.add(file); + for (const concept of obs.concepts ?? []) concepts.add(concept); + } + return { + sessionId: session.id, + project: session.project, + createdAt: new Date().toISOString(), + title: "Session budget exhausted", + narrative: + "Summary truncated because the session token budget was exhausted before another LLM call could run.", + keyDecisions: [], + filesModified: Array.from(files).slice(0, 50), + concepts: Array.from(concepts).slice(0, 50), + observationCount: compressed.length, + truncated: true, + }; +} + export function registerSummarizeFunction( sdk: ISdk, kv: StateKV, provider: MemoryProvider, metricsStore?: MetricsStore, ): void { - sdk.registerFunction("mem::summarize", - async (data: { sessionId: string; __agentmemoryAuditRetry?: boolean } | undefined) => { + sdk.registerFunction( + "mem::summarize", + async ( + data: { sessionId: string; __agentmemoryAuditRetry?: boolean } | undefined, + ) => + runWithSessionContext( + { + sessionId: data?.sessionId, + sourceFunction: "mem::summarize", + }, + async () => { const startMs = Date.now(); if (!data || typeof data.sessionId !== "string" || !data.sessionId.trim()) { return { success: false, error: "sessionId is required" }; @@ -459,6 +496,31 @@ export function registerSummarizeFunction( return { success: true, summary, qualityScore }; } catch (err) { + if (isSessionTokenBudgetExceededError(err)) { + const summary = buildBudgetTruncatedSummary(session, compressed); + await kv.set(KV.summaries, sessionId, summary); + const latencyMs = Date.now() - startMs; + if (metricsStore) { + await metricsStore.record("mem::summarize", latencyMs, true, 30); + } + await safeAudit(kv, "compress", "mem::summarize", [sessionId], { + observationCount: compressed.length, + provider: provider.name, + truncated: true, + reason: "session_budget_exhausted", + }, 30); + logger.warn("Summarize budget exhausted; stored truncated fallback", { + sessionId, + observationCount: compressed.length, + }); + return { + success: true, + summary, + qualityScore: 30, + truncated: true, + reason: "session_budget_exhausted", + }; + } const msg = err instanceof Error ? err.message : String(err); const latencyMs = Date.now() - startMs; if (metricsStore) { @@ -479,6 +541,7 @@ export function registerSummarizeFunction( } return { success: false, error: msg }; } - }, + }, + ), ); } diff --git a/src/index.ts b/src/index.ts index 6d838bca..c8cb61a5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -109,6 +109,11 @@ import { registerRetentionFunctions } from "./functions/retention.js"; import { registerCompressFileFunction } from "./functions/compress-file.js"; import { registerReplayFunctions } from "./functions/replay.js"; import { registerOutlineFunctions } from "./functions/outline.js"; +import { + createSessionBudgetGuard, + registerSessionBudgetFunctions, + setSessionBudgetGuard, +} from "./functions/session-budget.js"; import { registerApiTriggers } from "./triggers/api.js"; import { registerEventTriggers } from "./triggers/events.js"; import { registerMcpEndpoints } from "./mcp/server.js"; @@ -248,6 +253,7 @@ async function main() { writeWorkerPidfile(); const kv = new StateKV(sdk); + setSessionBudgetGuard(createSessionBudgetGuard(sdk)); const secret = getEnvVar("AGENTMEMORY_SECRET"); const metricsStore = new MetricsStore(kv); const dedupMap = new DedupMap(); @@ -353,6 +359,7 @@ async function main() { ); const registerAllFunctions = () => { + registerSessionBudgetFunctions(sdk, kv); registerPrivacyFunction(sdk); registerObserveFunction(sdk, kv, dedupMap, config.maxObservationsPerSession); registerImageQuotaCleanup(sdk, kv); diff --git a/src/providers/resilient.ts b/src/providers/resilient.ts index 91905ab1..7509f862 100644 --- a/src/providers/resilient.ts +++ b/src/providers/resilient.ts @@ -2,6 +2,12 @@ import type { MemoryProvider, CircuitBreakerState } from "../types.js"; import { getEnvVar } from "../config.js"; import { outputLanguageDirective } from "../prompts/output-language.js"; import { CircuitBreaker } from "./circuit-breaker.js"; +import { getSessionContext } from "../session-context.js"; +import { + estimateTokens, + getSessionBudgetGuard, + isSessionTokenBudgetExceededError, +} from "../functions/session-budget.js"; export class ResilientProvider implements MemoryProvider { private breaker = new CircuitBreaker(); @@ -11,16 +17,53 @@ export class ResilientProvider implements MemoryProvider { this.name = `resilient(${inner.name})`; } - private async call(fn: () => Promise): Promise { + private async call( + operation: "compress" | "summarize", + systemPrompt: string, + userPrompt: string, + fn: () => Promise, + ): Promise { if (!this.breaker.isAllowed) { throw new Error("circuit_breaker_open"); } + const guard = getSessionBudgetGuard(); + const context = getSessionContext(); + if (guard) { + await guard.check({ + sessionId: context.sessionId, + operation, + providerName: this.name, + }); + } try { const result = await fn(); this.breaker.recordSuccess(); + if (guard) { + await guard.record({ + sessionId: context.sessionId, + operation, + providerName: this.name, + inputTokens: estimateTokens(systemPrompt, userPrompt), + outputTokens: estimateTokens(result), + failed: false, + }); + } return result; } catch (err) { + if (isSessionTokenBudgetExceededError(err)) { + throw err; + } this.breaker.recordFailure(); + if (guard) { + await guard.record({ + sessionId: context.sessionId, + operation, + providerName: this.name, + inputTokens: 0, + outputTokens: 0, + failed: true, + }); + } throw err; } } @@ -33,14 +76,16 @@ export class ResilientProvider implements MemoryProvider { } async compress(systemPrompt: string, userPrompt: string): Promise { - return this.call(() => - this.inner.compress(this.withOutputLanguage(systemPrompt), userPrompt), + const system = this.withOutputLanguage(systemPrompt); + return this.call("compress", system, userPrompt, () => + this.inner.compress(system, userPrompt), ); } async summarize(systemPrompt: string, userPrompt: string): Promise { - return this.call(() => - this.inner.summarize(this.withOutputLanguage(systemPrompt), userPrompt), + const system = this.withOutputLanguage(systemPrompt); + return this.call("summarize", system, userPrompt, () => + this.inner.summarize(system, userPrompt), ); } diff --git a/src/session-context.ts b/src/session-context.ts new file mode 100644 index 00000000..c89796ec --- /dev/null +++ b/src/session-context.ts @@ -0,0 +1,43 @@ +import { AsyncLocalStorage } from "node:async_hooks"; + +export const SYSTEM_SESSION_BUDGET_ID = "__system__"; + +export interface SessionContext { + sessionId: string; + project?: string; + sourceFunction?: string; +} + +const storage = new AsyncLocalStorage(); + +function normalizeSessionId(sessionId: string | undefined): string { + const trimmed = sessionId?.trim(); + return trimmed && trimmed.length > 0 ? trimmed : SYSTEM_SESSION_BUDGET_ID; +} + +export function runWithSessionContext( + context: { + sessionId?: string; + project?: string; + sourceFunction?: string; + }, + fn: () => T, +): T { + return storage.run( + { + sessionId: normalizeSessionId(context.sessionId), + ...(context.project ? { project: context.project } : {}), + sourceFunction: context.sourceFunction ?? "system", + }, + fn, + ); +} + +export function getSessionContext(): SessionContext { + return ( + storage.getStore() ?? { + sessionId: SYSTEM_SESSION_BUDGET_ID, + sourceFunction: "system", + } + ); +} diff --git a/src/state/schema.ts b/src/state/schema.ts index 4b2ddb50..50ca5ced 100644 --- a/src/state/schema.ts +++ b/src/state/schema.ts @@ -91,6 +91,7 @@ export const KV = { // the followup-rate diagnostic. Key = sessionId. TTL-swept hourly. recentSearches: "mem:recent-searches", outlines: "mem:outlines", + sessionBudget: "mem:session-budget", } as const; export const STREAM = { diff --git a/src/telemetry/setup.ts b/src/telemetry/setup.ts index a9316cb5..fe3fed34 100644 --- a/src/telemetry/setup.ts +++ b/src/telemetry/setup.ts @@ -62,6 +62,7 @@ interface Histograms { qualityScore: Histogram; embeddingLatency: Histogram; vectorSearchLatency: Histogram; + sessionTokensUsed: Histogram; } type Meter = { @@ -107,6 +108,7 @@ const HISTOGRAM_NAMES: Array<[keyof Histograms, string]> = [ ["qualityScore", "quality.score"], ["embeddingLatency", "embedding.latency_ms"], ["vectorSearchLatency", "vector_search.latency_ms"], + ["sessionTokensUsed", "agentmemory.session.tokens_used"], ]; // Accessors so functions outside `initMetrics`'s closure can record into diff --git a/src/triggers/api.ts b/src/triggers/api.ts index 5138e191..445996b5 100644 --- a/src/triggers/api.ts +++ b/src/triggers/api.ts @@ -16,6 +16,7 @@ import { getBoundViewerPort, getViewerSkipped } from "../viewer/server.js"; import { MAX_FILES_UPPER_BOUND } from "../functions/replay.js"; import { normalizeSessionMetadata } from "../functions/session-metadata.js"; import { normalizePositiveLimit } from "../functions/limits.js"; +import { getSessionBudgetSummary } from "../functions/session-budget.js"; import { resolveSessionReapThresholdMs } from "../functions/session-reaper.js"; import { logger } from "../logger.js"; import { @@ -53,6 +54,23 @@ function parseReceiptQuery(raw: unknown): boolean { return raw === "true"; } +function parseSessionTokenCapOverride( + raw: unknown, +): { ok: true; value?: number } | { ok: false; error: string } { + if (raw === undefined) return { ok: true }; + if ( + typeof raw === "number" && + Number.isSafeInteger(raw) && + raw >= 0 + ) { + return { ok: true, value: raw }; + } + return { + ok: false, + error: "sessionTokenCap must be a non-negative integer when provided", + }; +} + function parseSearchFormat( value: unknown, fallback: "full" | "compact", @@ -346,6 +364,12 @@ export function registerApiTriggers( const functionMetrics = metricsStore ? await metricsStore.getAll() : []; const circuitBreaker = provider && "circuitState" in provider ? provider.circuitState : null; + const sessionBudgets = await getSessionBudgetSummary(kv).catch((err) => { + logger.warn("session budget summary failed", { + error: err instanceof Error ? err.message : String(err), + }); + return null; + }); const status = health?.status || "healthy"; const statusCode = status === "critical" ? 503 : 200; @@ -359,6 +383,7 @@ export function registerApiTriggers( health: health || null, functionMetrics, circuitBreaker, + sessionBudgets, viewerPort: getBoundViewerPort(), viewerSkipped: getViewerSkipped(), }, @@ -776,6 +801,7 @@ export function registerApiTriggers( model?: string; agent?: unknown; metadata?: unknown; + sessionTokenCap?: number; }>, ): Promise => { const body = (req.body ?? {}) as Record; @@ -790,6 +816,13 @@ export function registerApiTriggers( }, }; } + const sessionTokenCap = parseSessionTokenCapOverride(body.sessionTokenCap); + if (!sessionTokenCap.ok) { + return { + status_code: 400, + body: { error: sessionTokenCap.error }, + }; + } const title = asSessionPreview(body.title, 200); const summary = asSessionPreview(body.summary, 300) ?? title; const firstPrompt = asSessionPreview(body.firstPrompt, 200) ?? title; @@ -823,6 +856,21 @@ export function registerApiTriggers( ...(sessionMetadata.metadata ? { metadata: sessionMetadata.metadata } : {}), }; await kv.set(KV.sessions, sessionId, session); + const budgetPayload = + sessionTokenCap.value !== undefined + ? { sessionId, tokenCap: sessionTokenCap.value, source: "session_override" } + : { sessionId, source: "env" }; + try { + await sdk.trigger({ + function_id: "mem::session::budget::init", + payload: budgetPayload, + }); + } catch (err) { + logger.warn("session budget init failed during session start", { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } const contextResult = await sdk.trigger< { sessionId: string; project: string }, { context: string } diff --git a/src/triggers/events.ts b/src/triggers/events.ts index f22ae815..5bef6684 100644 --- a/src/triggers/events.ts +++ b/src/triggers/events.ts @@ -44,6 +44,17 @@ export function registerEventTriggers(sdk: ISdk, kv: StateKV): void { ...(normalized.ok && normalized.metadata ? { metadata: normalized.metadata } : {}), }; await kv.set(KV.sessions, data.sessionId, session); + try { + await sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { sessionId: data.sessionId, source: "env" }, + }); + } catch (err) { + logger.warn("session budget init failed during session started event", { + sessionId: data.sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } const contextResult = await sdk.trigger< { sessionId: string; project: string }, { context: string } diff --git a/src/types.ts b/src/types.ts index db15f5f5..3ce464f8 100644 --- a/src/types.ts +++ b/src/types.ts @@ -213,6 +213,48 @@ export interface SessionSummary { filesModified: string[]; concepts: string[]; observationCount: number; + truncated?: boolean; +} + +export type SessionBudgetStatus = + | "active" + | "near_cap" + | "exhausted" + | "disabled"; +export type SessionBudgetSource = + | "env" + | "session_override" + | "implicit" + | "system"; +export type SessionBudgetOperation = "compress" | "summarize"; + +export interface SessionBudget { + sessionId: string; + tokenCap: number; + tokensUsed: number; + costEstimate: number; + status: SessionBudgetStatus; + source: SessionBudgetSource; + estimated: true; + createdAt: string; + updatedAt: string; + callCount: number; + blockedCallCount: number; + failedCallCount: number; + warnEmittedAt?: string; + exhaustedAt?: string; + lastBlockedAt?: string; + lastRecordedAt?: string; + lastModel?: string; + lastOperation?: SessionBudgetOperation; +} + +export interface SessionBudgetSummary { + enabled: boolean; + active: number; + nearCap: number; + exhausted: number; + total: number; } export type HookType = @@ -277,6 +319,7 @@ export interface AgentMemoryConfig { viewerPort: number; provider: ProviderConfig; tokenBudget: number; + sessionTokenCap: number; maxObservationsPerSession: number; compressionModel: string; dataDir: string; @@ -753,7 +796,10 @@ export interface AuditEntry { | "slot_replace" | "slot_create" | "slot_delete" - | "slot_reflect"; + | "slot_reflect" + | "session_budget_init" + | "session_budget_record" + | "session_budget_reap"; userId?: string; functionId: string; targetIds: string[]; diff --git a/test/api-session-start.test.ts b/test/api-session-start.test.ts index fe50a306..5b610ae3 100644 --- a/test/api-session-start.test.ts +++ b/test/api-session-start.test.ts @@ -32,9 +32,14 @@ import { mockKV, mockSdk } from "./helpers/mocks.js"; function setupApi() { const sdk = mockSdk(); const kv = mockKV(); + const budgetInits: unknown[] = []; sdk.registerFunction("mem::context", async () => ({ context: "" })); + sdk.registerFunction("mem::session::budget::init", async (payload) => { + budgetInits.push(payload); + return { success: true }; + }); registerApiTriggers(sdk as never, kv as never); - return { sdk, kv }; + return { sdk, kv, budgetInits }; } describe("api::session::start", () => { @@ -141,6 +146,48 @@ describe("api::session::start", () => { expect(res.body.session.agentId).toBe("isolated-agent"); }); + it("whitelists per-session token cap override into budget init", async () => { + const { sdk, budgetInits } = setupApi(); + + const res = (await sdk.trigger("api::session::start", { + body: { + sessionId: "ses_budget_override", + project: "/tmp/project", + cwd: "/tmp/project", + sessionTokenCap: 25_000, + }, + })) as { status_code: number }; + + expect(res.status_code).toBe(200); + expect(budgetInits).toEqual([ + { + sessionId: "ses_budget_override", + tokenCap: 25_000, + source: "session_override", + }, + ]); + }); + + it("rejects malformed per-session token cap before writing a session", async () => { + const { sdk, kv, budgetInits } = setupApi(); + + const res = (await sdk.trigger("api::session::start", { + body: { + sessionId: "ses_bad_budget", + project: "/tmp/project", + cwd: "/tmp/project", + sessionTokenCap: "25000", + }, + })) as { status_code: number; body: { error: string } }; + + expect(res.status_code).toBe(400); + expect(res.body.error).toBe( + "sessionTokenCap must be a non-negative integer when provided", + ); + expect(await kv.get(KV.sessions, "ses_bad_budget")).toBeNull(); + expect(budgetInits).toEqual([]); + }); + it("rejects malformed session metadata before writing a session", async () => { const { sdk, kv } = setupApi(); diff --git a/test/cli-status-budget.test.ts b/test/cli-status-budget.test.ts new file mode 100644 index 00000000..2609c485 --- /dev/null +++ b/test/cli-status-budget.test.ts @@ -0,0 +1,27 @@ +import { describe, expect, it } from "vitest"; + +import { renderSessionBudgetStatus } from "../src/cli/status-render.js"; + +describe("agentmemory status session budget line", () => { + it("renders active, near-cap, and exhausted budget counts", () => { + expect( + renderSessionBudgetStatus({ + enabled: true, + active: 3, + nearCap: 2, + exhausted: 1, + }), + ).toBe("Sessions: 3 active, 2 near-cap, 1 exhausted"); + }); + + it("returns null when session budget enforcement is disabled", () => { + expect( + renderSessionBudgetStatus({ + enabled: false, + active: 0, + nearCap: 0, + exhausted: 0, + }), + ).toBeNull(); + }); +}); diff --git a/test/compress-budget.test.ts b/test/compress-budget.test.ts new file mode 100644 index 00000000..b0d7a2f5 --- /dev/null +++ b/test/compress-budget.test.ts @@ -0,0 +1,87 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +vi.mock("../src/functions/search.js", () => ({ + getSearchIndex: () => ({ add: vi.fn() }), + vectorIndexAddGuarded: vi.fn().mockResolvedValue(false), +})); + +import { registerCompressFunction } from "../src/functions/compress.js"; +import { SessionTokenBudgetExceededError } from "../src/functions/session-budget.js"; +import type { + CompressedObservation, + MemoryProvider, + RawObservation, +} from "../src/types.js"; + +function mockKV() { + const store = new Map>(); + return { + get: async (scope: string, key: string): Promise => + (store.get(scope)?.get(key) as T) ?? null, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + list: async (scope: string): Promise => + (store.get(scope) + ? Array.from(store.get(scope)!.values()) + : []) as T[], + }; +} + +describe("mem::compress session budget fallback", () => { + it("stores a synthetic compressed observation when budget is exhausted", async () => { + let handler: Function | null = null; + const sdk = { + registerFunction: vi.fn((id: string, fn: Function) => { + if (id === "mem::compress") handler = fn; + }), + trigger: vi.fn().mockResolvedValue(undefined), + }; + const kv = mockKV(); + const provider: MemoryProvider = { + name: "budgeted", + async compress(): Promise { + throw new SessionTokenBudgetExceededError("ses_budget", 100, 100); + }, + async summarize(): Promise { + return ""; + }, + }; + registerCompressFunction(sdk as never, kv as never, provider); + + const raw: RawObservation = { + id: "raw_1", + sessionId: "ses_budget", + timestamp: "2026-06-19T12:00:00.000Z", + hookType: "post_tool_use", + toolName: "Read", + toolInput: { file_path: "src/a.ts" }, + toolOutput: "file content", + raw: {}, + }; + const result = (await handler!({ + observationId: "obs_budget", + sessionId: "ses_budget", + raw, + })) as { success: boolean; compressed: CompressedObservation }; + + expect(result.success).toBe(true); + expect(result.compressed.id).toBe("obs_budget"); + expect(result.compressed.sessionId).toBe("ses_budget"); + expect(result.compressed.title).toBeTruthy(); + const stored = await kv.get( + "mem:obs:ses_budget", + "obs_budget", + ); + expect(stored).toMatchObject({ + id: "obs_budget", + sessionId: "ses_budget", + }); + }); +}); diff --git a/test/observe-implicit-session.test.ts b/test/observe-implicit-session.test.ts index 9500d876..de628c76 100644 --- a/test/observe-implicit-session.test.ts +++ b/test/observe-implicit-session.test.ts @@ -69,6 +69,11 @@ describe("observe implicit session create (#638)", () => { const { registerObserveFunction } = await import("../src/functions/observe.js"); const sdk = mockSdk(); const kv = mockKV(); + const budgetInits: unknown[] = []; + sdk.registerFunction("mem::session::budget::init", async (payload) => { + budgetInits.push(payload); + return { success: true }; + }); registerObserveFunction(sdk as never, kv as never); const result = (await sdk.trigger("mem::observe", { @@ -92,6 +97,12 @@ describe("observe implicit session create (#638)", () => { expect(session.status).toBe("active"); expect(session.observationCount).toBe(1); expect(session.firstPrompt).toBe("ship the helm chart"); + expect(budgetInits).toEqual([ + { + sessionId: "ses_opencode_abc", + source: "implicit", + }, + ]); }); it("does not implicit-create when project+cwd missing (test-payload back-compat)", async () => { diff --git a/test/resilient-provider.test.ts b/test/resilient-provider.test.ts index 231b03e4..bf7f7341 100644 --- a/test/resilient-provider.test.ts +++ b/test/resilient-provider.test.ts @@ -1,6 +1,11 @@ -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { ResilientProvider } from "../src/providers/resilient.js"; +import { + SessionTokenBudgetExceededError, + setSessionBudgetGuard, +} from "../src/functions/session-budget.js"; +import { runWithSessionContext } from "../src/session-context.js"; import type { MemoryProvider } from "../src/types.js"; function restoreOutputLang(value: string | undefined): void { @@ -39,6 +44,7 @@ describe("ResilientProvider output language", () => { afterEach(() => { restoreOutputLang(savedOutputLang); + setSessionBudgetGuard(null); }); it("forwards compress system prompts unchanged when output language is blank", async () => { @@ -107,3 +113,143 @@ describe("ResilientProvider output language", () => { expect(inner.calls[0].user).toBe("user prompt"); }); }); + +describe("ResilientProvider session budget guard", () => { + afterEach(() => { + setSessionBudgetGuard(null); + }); + + it("checks the active ALS session before calling the inner provider", async () => { + const inner = makeCapturingProvider(); + const provider = new ResilientProvider(inner); + const guard = { + check: async () => { + throw new SessionTokenBudgetExceededError("ses_blocked", 100, 100); + }, + record: async () => {}, + }; + setSessionBudgetGuard(guard); + + await expect( + runWithSessionContext({ sessionId: "ses_blocked" }, () => + provider.compress("system", "user"), + ), + ).rejects.toThrow(SessionTokenBudgetExceededError); + + expect(inner.calls).toHaveLength(0); + }); + + it("records estimated tokens after successful compress and summarize calls", async () => { + const inner = makeCapturingProvider(); + const provider = new ResilientProvider(inner); + const calls: unknown[] = []; + setSessionBudgetGuard({ + check: async (call) => { + calls.push({ phase: "check", ...call }); + }, + record: async (call) => { + calls.push({ phase: "record", ...call }); + }, + }); + + await runWithSessionContext({ sessionId: "ses_record" }, async () => { + await provider.compress("system", "user"); + await provider.summarize("summary system", "summary user"); + }); + + expect(calls).toEqual([ + expect.objectContaining({ + phase: "check", + sessionId: "ses_record", + operation: "compress", + }), + expect.objectContaining({ + phase: "record", + sessionId: "ses_record", + operation: "compress", + inputTokens: expect.any(Number), + outputTokens: expect.any(Number), + failed: false, + }), + expect.objectContaining({ + phase: "check", + sessionId: "ses_record", + operation: "summarize", + }), + expect.objectContaining({ + phase: "record", + sessionId: "ses_record", + operation: "summarize", + inputTokens: expect.any(Number), + outputTokens: expect.any(Number), + failed: false, + }), + ]); + }); + + it("records zero tokens for failed provider calls", async () => { + const inner: MemoryProvider = { + name: "fail", + async compress(): Promise { + throw new Error("provider failed"); + }, + async summarize(): Promise { + return "unused"; + }, + }; + const provider = new ResilientProvider(inner); + const records: unknown[] = []; + setSessionBudgetGuard({ + check: async () => {}, + record: async (call) => { + records.push(call); + }, + }); + + await expect( + runWithSessionContext({ sessionId: "ses_failed" }, () => + provider.compress("system", "user"), + ), + ).rejects.toThrow("provider failed"); + + expect(records).toEqual([ + expect.objectContaining({ + sessionId: "ses_failed", + operation: "compress", + inputTokens: 0, + outputTokens: 0, + failed: true, + }), + ]); + }); + + it("does not check budget when the circuit is already open", async () => { + const inner: MemoryProvider = { + name: "fail", + async compress(): Promise { + throw new Error("provider failed"); + }, + async summarize(): Promise { + return "unused"; + }, + }; + const provider = new ResilientProvider(inner); + const guard = { + check: async () => {}, + record: async () => {}, + }; + setSessionBudgetGuard(guard); + + await expect(provider.compress("system", "user")).rejects.toThrow(); + await expect(provider.compress("system", "user")).rejects.toThrow(); + await expect(provider.compress("system", "user")).rejects.toThrow(); + const checkSpy = vi.spyOn(guard, "check"); + checkSpy.mockClear(); + + await expect(provider.compress("system", "user")).rejects.toThrow( + "circuit_breaker_open", + ); + + expect(checkSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/test/session-budget.test.ts b/test/session-budget.test.ts new file mode 100644 index 00000000..1d2068e9 --- /dev/null +++ b/test/session-budget.test.ts @@ -0,0 +1,198 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +import { DEFAULT_SESSION_TOKEN_CAP, getSessionTokenCap } from "../src/config.js"; +import { + SYSTEM_SESSION_BUDGET_ID, + registerSessionBudgetFunctions, +} from "../src/functions/session-budget.js"; +import { KV } from "../src/state/schema.js"; +import { mockKV, mockSdk } from "./helpers/mocks.js"; +import type { SessionBudget } from "../src/types.js"; + +function setupBudget() { + const sdk = mockSdk(); + const kv = mockKV(); + const events: Array<{ type: string; payload: unknown }> = []; + sdk.registerFunction("event::mem::budget::soft-warned", async (payload) => { + events.push({ type: "soft", payload }); + return { success: true }; + }); + sdk.registerFunction("event::mem::budget::exhausted", async (payload) => { + events.push({ type: "exhausted", payload }); + return { success: true }; + }); + registerSessionBudgetFunctions(sdk as never, kv as never); + return { sdk, kv, events }; +} + +describe("session token budget config", () => { + it("adds the exact session budget KV scope", () => { + expect(KV.sessionBudget).toBe("mem:session-budget"); + }); + + it("defaults to 100000 tokens when unset or blank", () => { + expect(DEFAULT_SESSION_TOKEN_CAP).toBe(100_000); + expect(getSessionTokenCap({})).toBe(100_000); + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "" })).toBe( + 100_000, + ); + }); + + it("allows zero to disable session budget enforcement", () => { + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "0" })).toBe(0); + }); + + it("accepts positive integer override values", () => { + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "25000" })).toBe( + 25_000, + ); + }); + + it("falls back to default for malformed configured values", () => { + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "100k" })).toBe( + 100_000, + ); + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "-1" })).toBe( + 100_000, + ); + expect(getSessionTokenCap({ AGENTMEMORY_SESSION_TOKEN_CAP: "1.5" })).toBe( + 100_000, + ); + }); +}); + +describe("mem::session::budget functions", () => { + it("initializes a per-session budget with an override cap", async () => { + const { sdk, kv } = setupBudget(); + + const result = (await sdk.trigger("mem::session::budget::init", { + sessionId: "ses_budget", + tokenCap: 25, + })) as { success: true; budget: SessionBudget }; + + expect(result.budget).toMatchObject({ + sessionId: "ses_budget", + tokenCap: 25, + tokensUsed: 0, + status: "active", + source: "session_override", + }); + await expect( + kv.get(KV.sessionBudget, "ses_budget"), + ).resolves.toMatchObject({ tokenCap: 25 }); + }); + + it("emits soft warning once and exhausted event at the hard cap", async () => { + const { sdk, kv, events } = setupBudget(); + await sdk.trigger("mem::session::budget::init", { + sessionId: "ses_warn", + tokenCap: 10, + }); + + const warn = (await sdk.trigger("mem::session::budget::record", { + sessionId: "ses_warn", + inputTokens: 4, + outputTokens: 4, + model: "test-model", + operation: "compress", + })) as { warned: boolean; exhausted: boolean }; + const exhaust = (await sdk.trigger("mem::session::budget::record", { + sessionId: "ses_warn", + inputTokens: 1, + outputTokens: 1, + model: "test-model", + operation: "summarize", + })) as { warned: boolean; exhausted: boolean }; + await sdk.trigger("mem::session::budget::record", { + sessionId: "ses_warn", + inputTokens: 0, + outputTokens: 0, + model: "test-model", + operation: "summarize", + phase: "check", + }); + + const stored = await kv.get(KV.sessionBudget, "ses_warn"); + expect(warn.warned).toBe(true); + expect(exhaust.exhausted).toBe(true); + expect(stored).toMatchObject({ + tokensUsed: 10, + status: "exhausted", + }); + expect(events.map((event) => event.type)).toEqual([ + "soft", + "exhausted", + ]); + }); + + it("records failed provider calls as zero tokens", async () => { + const { sdk, kv } = setupBudget(); + await sdk.trigger("mem::session::budget::init", { + sessionId: "ses_fail", + tokenCap: 10, + }); + + await sdk.trigger("mem::session::budget::record", { + sessionId: "ses_fail", + inputTokens: 0, + outputTokens: 0, + model: "test-model", + operation: "compress", + failed: true, + }); + + await expect( + kv.get(KV.sessionBudget, "ses_fail"), + ).resolves.toMatchObject({ + tokensUsed: 0, + failedCallCount: 1, + }); + }); + + it("uses the system sentinel and lazy init when no session id is active", async () => { + const { sdk, kv } = setupBudget(); + + await sdk.trigger("mem::session::budget::record", { + inputTokens: 2, + outputTokens: 3, + model: "test-model", + operation: "summarize", + }); + + await expect( + kv.get(KV.sessionBudget, SYSTEM_SESSION_BUDGET_ID), + ).resolves.toMatchObject({ + sessionId: SYSTEM_SESSION_BUDGET_ID, + tokensUsed: 5, + source: "system", + }); + }); + + it("applies concurrent increments without losing usage", async () => { + const { sdk, kv } = setupBudget(); + await sdk.trigger("mem::session::budget::init", { + sessionId: "ses_concurrent", + tokenCap: 1_000, + }); + + await Promise.all( + Array.from({ length: 20 }, () => + sdk.trigger("mem::session::budget::record", { + sessionId: "ses_concurrent", + inputTokens: 1, + outputTokens: 1, + model: "test-model", + operation: "compress", + }), + ), + ); + + await expect( + kv.get(KV.sessionBudget, "ses_concurrent"), + ).resolves.toMatchObject({ tokensUsed: 40 }); + }); +}); diff --git a/test/session-context.test.ts b/test/session-context.test.ts new file mode 100644 index 00000000..a721f88e --- /dev/null +++ b/test/session-context.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; + +import { + SYSTEM_SESSION_BUDGET_ID, + getSessionContext, + runWithSessionContext, +} from "../src/session-context.js"; + +describe("session context", () => { + it("falls back to the system budget sentinel outside a session scope", () => { + expect(getSessionContext()).toMatchObject({ + sessionId: SYSTEM_SESSION_BUDGET_ID, + sourceFunction: "system", + }); + }); + + it("preserves independent session ids across awaited work", async () => { + const [a, b] = await Promise.all([ + runWithSessionContext( + { sessionId: "ses_a", sourceFunction: "mem::compress" }, + async () => { + await Promise.resolve(); + return getSessionContext(); + }, + ), + runWithSessionContext( + { sessionId: "ses_b", sourceFunction: "mem::summarize" }, + async () => { + await Promise.resolve(); + return getSessionContext(); + }, + ), + ]); + + expect(a).toMatchObject({ + sessionId: "ses_a", + sourceFunction: "mem::compress", + }); + expect(b).toMatchObject({ + sessionId: "ses_b", + sourceFunction: "mem::summarize", + }); + }); +}); diff --git a/test/summarize.test.ts b/test/summarize.test.ts index 4e20e27a..24848077 100644 --- a/test/summarize.test.ts +++ b/test/summarize.test.ts @@ -30,6 +30,7 @@ vi.mock("../src/functions/audit.js", () => ({ })); import { registerSummarizeFunction } from "../src/functions/summarize.js"; +import { SessionTokenBudgetExceededError } from "../src/functions/session-budget.js"; import { safeAudit } from "../src/functions/audit.js"; import type { CompressedObservation, @@ -678,4 +679,65 @@ describe("mem::summarize chunking", () => { expect(result.success).toBe(false); expect(safeAudit).not.toHaveBeenCalled(); }); + + it("stores a truncated deterministic summary when the session budget is exhausted", async () => { + const provider: MemoryProvider & { calls: Array<{ system: string; user: string }> } = { + name: "test", + calls: [], + compress: async () => "", + summarize: async (system: string, user: string) => { + provider.calls.push({ system, user }); + throw new SessionTokenBudgetExceededError("ses_budget_single", 100, 100); + }, + }; + const { handler, kv } = await setupHandler({ + sessionId: "ses_budget_single", + obsCount: 3, + provider, + }); + + const result: any = await handler({ sessionId: "ses_budget_single" }); + + expect(result.success).toBe(true); + expect(result.summary.truncated).toBe(true); + expect(result.summary.title).toContain("budget"); + expect(provider.calls).toHaveLength(1); + const stored: any = await kv.get("summaries", "ses_budget_single"); + expect(stored.truncated).toBe(true); + expect(stored.observationCount).toBe(3); + }); + + it("does not retry chunked summarize after budget exhaustion and skips reduce", async () => { + process.env.SUMMARIZE_CHUNK_SIZE = "100"; + process.env.SUMMARIZE_CHUNK_CONCURRENCY = "1"; + let calls = 0; + const provider: MemoryProvider & { calls: Array<{ system: string; user: string }> } = { + name: "test", + calls: [], + compress: async () => "", + summarize: async (system: string, user: string) => { + provider.calls.push({ system, user }); + calls += 1; + if (calls === 1) return summaryXml({ title: "Chunk 1" }); + throw new SessionTokenBudgetExceededError("ses_budget_chunk", 100, 100); + }, + }; + const { handler, kv } = await setupHandler({ + sessionId: "ses_budget_chunk", + obsCount: 250, + provider, + }); + + const result: any = await handler({ sessionId: "ses_budget_chunk" }); + + expect(result.success).toBe(true); + expect(result.summary.truncated).toBe(true); + expect(provider.calls).toHaveLength(2); + expect(provider.calls.some((call) => call.system.includes("merging"))).toBe( + false, + ); + const stored: any = await kv.get("summaries", "ses_budget_chunk"); + expect(stored.truncated).toBe(true); + expect(stored.observationCount).toBe(250); + }); }); diff --git a/test/telemetry-session-budget.test.ts b/test/telemetry-session-budget.test.ts new file mode 100644 index 00000000..e2040381 --- /dev/null +++ b/test/telemetry-session-budget.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it, vi } from "vitest"; + +import { initMetrics } from "../src/telemetry/setup.js"; + +describe("session budget telemetry", () => { + it("registers the required session tokens histogram", () => { + const created: string[] = []; + + initMetrics(() => ({ + createCounter: vi.fn(() => ({ add: vi.fn() })), + createHistogram: vi.fn((name: string) => { + created.push(name); + return { record: vi.fn() }; + }), + })); + + expect(created).toContain("agentmemory.session.tokens_used"); + }); +});