diff --git a/.env.example b/.env.example index e1bda5cc..665cf382 100644 --- a/.env.example +++ b/.env.example @@ -170,6 +170,10 @@ # AGENTMEMORY_HIGH_ORDER_CONTEXT=false # Opt out of semantic/procedural/crystal/insight context blocks and smart-search arrays. Unset follows CONSOLIDATION_ENABLED. # EVICTION_ENABLED=true # Run mem::evict on a timer. Default on; set false to disable scheduled eviction. # EVICTION_INTERVAL_MS=86400000 # Scheduled mem::evict interval in ms. Default 24h. +# AGENTMEMORY_REAP_ENABLED=true # Recover active sessions abandoned by abrupt process exits. Default on. +# AGENTMEMORY_REAP_ON_STARTUP=true # Run one background abandoned-session reconciliation at startup. Default on. +# AGENTMEMORY_REAP_THRESHOLD_MS=14400000 # Inactivity threshold for recovery. Default 4h. +# AGENTMEMORY_REAP_INTERVAL_MS=0 # Optional periodic recovery interval in ms. Default 0 (disabled). # CONSOLIDATION_DECAY_DAYS=30 # Age (days) after which non-reinforced memories decay during consolidation # GRAPH_EXTRACTION_ENABLED=true # Extract concept-graph edges on remember; powers the graph-traversal recall path # GRAPH_EXTRACTION_BATCH_SIZE=8 # Memories per graph-extraction batch diff --git a/AGENTS.md b/AGENTS.md index b8a8c0bc..a489d956 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -118,7 +118,7 @@ Hook scripts in `src/hooks/` are standalone Node.js scripts (no iii-sdk import). ## Current Stats (v0.9.28) - 61 MCP tools (8 visible by default, `AGENTMEMORY_TOOLS=all` for all) -- 134 REST endpoints +- 135 REST endpoints - 6 MCP resources, 3 MCP prompts - 12 hooks, 15 skills - 50+ iii functions diff --git a/README.md b/README.md index 5a0cf203..4851e11a 100644 --- a/README.md +++ b/README.md @@ -474,6 +474,7 @@ agentmemory stop # tear it down agentmemory remove # uninstall everything we created agentmemory connect claude-code # wire one agent agentmemory doctor # interactive diagnostics + fix prompts +agentmemory reap --threshold 4h # recover sessions abandoned by abrupt exits ``` From v0.9.16 onward, the first npx run prompts you to install globally inline — answer `Y` once and you're set. If you skip, fall back to either of these for a fresh fetch: @@ -505,6 +506,17 @@ For very large imports, set `AGENTMEMORY_IMPORT_TIMEOUT_MS` to raise the CLI's i > **Heads-up if you rely on `import-jsonl` as your primary capture path:** Claude Code's `cleanupPeriodDays` (in `~/.claude/settings.json`, default **30**) auto-deletes JSONL transcripts older than that window from `~/.claude/projects/`. If you install agentmemory fresh on a months-old Claude Code history, anything older than 30 days is already gone before the first import. Either run `import-jsonl` on a cron, raise `cleanupPeriodDays` to something higher, or wire the auto-capture hooks (the default plugin install path) so each turn lands in agentmemory while the session is live and the JSONL cleanup stops mattering. +### Abandoned Session Recovery + +If a host sleeps, crashes, OOM-kills the worker, or exits before the session-end hook fires, the worker now reconciles active sessions whose last activity is older than the recovery threshold. Startup runs one background reconciliation by default, and periodic recovery remains opt-in: + +```bash +agentmemory reap --threshold 4h +agentmemory reap --dry-run +``` + +Tune the daemon behavior with `AGENTMEMORY_REAP_THRESHOLD_MS=14400000`, `AGENTMEMORY_REAP_ENABLED=true`, `AGENTMEMORY_REAP_ON_STARTUP=true`, and `AGENTMEMORY_REAP_INTERVAL_MS=0`. + ### Upgrade / Maintenance Use the maintenance command when you intentionally want to update your local runtime: @@ -1799,6 +1811,10 @@ Create `~/.agentmemory/.env`: # AGENTMEMORY_HIGH_ORDER_CONTEXT=false # opt out of semantic/procedural/crystal/insight context + smart-search arrays # EVICTION_ENABLED=true # ON by default. Runs mem::evict on a timer. # EVICTION_INTERVAL_MS=86400000 # Default: 24h +# AGENTMEMORY_REAP_ENABLED=true # ON by default. Recovers sessions abandoned by abrupt exits. +# AGENTMEMORY_REAP_ON_STARTUP=true # Default: run one background reconciliation on daemon startup. +# AGENTMEMORY_REAP_THRESHOLD_MS=14400000 # Default: 4h inactivity threshold. +# AGENTMEMORY_REAP_INTERVAL_MS=0 # Default: periodic recovery disabled. # LESSON_DECAY_ENABLED=true # OBSIDIAN_AUTO_EXPORT=false # AGENTMEMORY_EXPORT_ROOT=~/.agentmemory @@ -1823,7 +1839,7 @@ Create `~/.agentmemory/.env`:

API

-134 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers. +135 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers. ### Health Thresholds diff --git a/assets/tags/light/section-api.svg b/assets/tags/light/section-api.svg index bdfc3e65..474be869 100644 --- a/assets/tags/light/section-api.svg +++ b/assets/tags/light/section-api.svg @@ -12,5 +12,5 @@ API - 134 REST endpoints + 135 REST endpoints diff --git a/assets/tags/section-api.svg b/assets/tags/section-api.svg index 8db1acb4..64a27d1c 100644 --- a/assets/tags/section-api.svg +++ b/assets/tags/section-api.svg @@ -12,5 +12,5 @@ API - 134 REST endpoints + 135 REST endpoints diff --git a/docs/todos/2026-06-19-issue-298-abandoned-session-recovery/plan.md b/docs/todos/2026-06-19-issue-298-abandoned-session-recovery/plan.md new file mode 100644 index 00000000..1332d7ad --- /dev/null +++ b/docs/todos/2026-06-19-issue-298-abandoned-session-recovery/plan.md @@ -0,0 +1,446 @@ +# Abandoned Session Recovery Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Recover abandoned active sessions older than a configurable inactivity threshold without deleting their observations. + +**Architecture:** Add one shared `mem::session-reap` function that owns threshold parsing, last-activity detection, locked recovery, audit logging, and post-batch consolidation. REST, CLI, startup reconciliation, and diagnostics become thin adapters around that function. + +**Tech Stack:** TypeScript ESM, iii-sdk registered functions/triggers, existing `StateKV`, `withKeyedLock`, `recordAudit`, existing REST/CLI helpers, Vitest. + +**Status:** Implemented and verified in the task record. The task checkboxes +below preserve the original TDD execution plan; final evidence lives in +`todo.md`. + +--- + +## Sprint Contract Carry-Forward + +Scope: +- Create backend recovery for active sessions whose last valid activity is older than the threshold. +- Expose recovery through `POST /agentmemory/sessions/reap`, `agentmemory reap`, startup reconciliation, and `mem::diagnose`/`mem::heal`. +- Use existing `event::session::stopped`, `mem::consolidate-pipeline`, `KV.sessions`, observation scopes, and audit. + +Non-goals: +- No schema migration, dependency change, new KV scope, MCP tool, auth behavior change, or viewer banner. +- Do not refactor `api::session::end` or `mem::evict` unless implementation proves a surgical shared helper is necessary. +- Do not add `--no-consolidation` or other bypass controls. + +Acceptance: +- Last activity is newest valid timestamp among `session.startedAt`, existing `session.updatedAt`, and observation timestamps. +- Exact threshold uses `ageMs >= thresholdMs`. +- Candidate detection first prefilters by valid session-level `updatedAt`/`startedAt`; it lists observations only when that session-level timestamp is stale or invalid. +- Recovery re-checks under the same `withKeyedLock("obs:" + session.id, ...)` key used by `mem::observe`. +- Dry run performs no writes, lifecycle triggers, consolidation, or audit. +- Recovery marks sessions completed, sets `endedAt` to last activity, awaits stopped lifecycle, records a `"heal"` audit entry, and runs consolidation once when at least one session recovered. +- Startup reconciliation is backgrounded and periodic reconciliation is opt-in only. + +## File Structure + +Create: +- `src/functions/session-reaper.ts`: threshold parsing, last-activity helpers, candidate detection, recovery function registration. +- `src/functions/session-reaper-scheduler.ts`: startup and optional periodic local reconciliation. +- `test/session-reaper.test.ts`: core TDD coverage. +- `test/session-reaper-scheduler.test.ts`: scheduler TDD coverage. + +Modify: +- `src/index.ts`: register reaper, start scheduler, clear handles, endpoint count. +- `src/triggers/api.ts`: add `api::session-reap` and `/agentmemory/sessions/reap`. +- `src/cli.ts`: add `reap` help and command. +- `src/functions/diagnostics.ts`: use shared abandoned-session detection and heal via reaper. +- `README.md`, `AGENTS.md`, `.env.example`, `assets/tags/section-api.svg`, `assets/tags/light/section-api.svg`, generated plugin skill references: document command/env and endpoint count. +- `test/diagnostics.test.ts`, `test/api-boundary-coverage.test.ts`, `test/cli-http-auth.test.ts`, `test/session-end-triggers-graph.test.ts`, `test/consistency.test.ts`, `test/plugin-surface-contract.test.ts`: focused contract updates. + +## Task 1: Core Reaper + +**Files:** +- Create: `src/functions/session-reaper.ts` +- Test: `test/session-reaper.test.ts` + +- [ ] **Step 1: Write failing parser and candidate tests** + +Add tests covering: + +```ts +expect(parseSessionReapThreshold("4h")).toEqual({ ok: true, valueMs: 14_400_000 }); +expect(parseSessionReapThreshold("30m")).toEqual({ ok: true, valueMs: 1_800_000 }); +expect(parseSessionReapThreshold("250ms")).toEqual({ ok: true, valueMs: 250 }); +expect(parseSessionReapThreshold(1000)).toEqual({ ok: true, valueMs: 1000 }); +expect(parseSessionReapThreshold("0")).toMatchObject({ ok: false }); +expect(parseSessionReapThreshold("1h30m")).toMatchObject({ ok: false }); +``` + +Add tests proving newest valid last activity wins and future activity is skipped: + +```ts +expect(candidate.lastActivitySource).toBe("observation"); +expect(candidate.lastActivityAt).toBe("2026-06-19T11:00:00.000Z"); +expect(result.sessions[0].reason).toBe("future_last_activity"); +``` + +Add cost-safety tests proving: +- a fresh active session with valid `updatedAt` newer than threshold does not call `kv.list(KV.observations(sessionId))`. +- a stale `updatedAt` with a recent observation skips recovery. +- invalid `updatedAt` falls back to observations, then `startedAt`. +- sessions with all invalid or future timestamps are skipped and reported. + +- [ ] **Step 2: Run RED** + +Run: + +```bash +corepack pnpm exec vitest run test/session-reaper.test.ts +``` + +Expected: fails because `src/functions/session-reaper.ts` does not exist. + +- [ ] **Step 3: Implement threshold and candidate helpers** + +Implement exported helpers: + +```ts +export const DEFAULT_SESSION_REAP_THRESHOLD_MS = 4 * 60 * 60 * 1000; +export function parseSessionReapThreshold(raw: unknown): ThresholdParseResult; +export function resolveSessionReapThresholdMs(input?: { threshold?: unknown; thresholdMs?: unknown }, env?: Record): ThresholdParseResult; +export async function getSessionLastActivity(kv: StateKV, session: Session): Promise; +export async function listAbandonedSessionCandidates(kv: StateKV, opts: { thresholdMs: number; now?: Date }): Promise; +``` + +Candidate detection rules: +- Inspect only active sessions. +- Use valid `updatedAt` when present, otherwise valid `startedAt`, as a cheap prefilter before listing observations. +- Skip observation listing for sessions whose valid session-level timestamp is newer than the threshold. +- List observations only for stale or invalid session-level timestamps; use the newest valid observation timestamp when present. +- Treat future last-activity timestamps as skipped evidence, not recoverable candidates. +- Process recoveries sequentially or with an explicit small concurrency bound. + +- [ ] **Step 4: Write failing recovery tests** + +Add tests for: +- dry run has candidates but no `event::session::stopped`, no `mem::consolidate-pipeline`, no audit, no status change. +- real run sets `status: "completed"` and `endedAt` to the last activity timestamp. +- real run awaits `event::session::stopped` once per recovered session. +- consolidation runs once after multiple recovered sessions. +- stopped-pipeline failure leaves the session completed, records `pipelineOk: false`, and returns the failure detail. +- concurrent/stale re-read skips a session that became completed under the lock. +- a fresh observation/update landing under the shared observation lock causes the re-fetch/recompute path to skip recovery. + +- [ ] **Step 5: Run RED for recovery tests** + +Run: + +```bash +corepack pnpm exec vitest run test/session-reaper.test.ts +``` + +Expected: parser/candidate tests pass; recovery tests fail because `registerSessionReaperFunction` is not implemented. + +- [ ] **Step 6: Implement `mem::session-reap`** + +Implementation rules: +- Register function id `mem::session-reap`. +- Validate `dryRun`, `threshold`, and `thresholdMs`. +- Use `withKeyedLock("obs:" + session.id, ...)` so recovery serializes with `mem::observe`. +- Re-fetch and recompute inside the lock. +- Set completed state with `kv.set(KV.sessions, id, { ...fresh, status: "completed", endedAt: lastActivityAt })`. +- Await `event::session::stopped`. +- Record audit with `operation: "heal"`, `functionId: "mem::session-reap"`, and reason `abandoned-session-reap`. +- Trigger `mem::consolidate-pipeline` once after recovered sessions. + +- [ ] **Step 7: Run GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/session-reaper.test.ts +``` + +Expected: all tests in `test/session-reaper.test.ts` pass. + +## Task 2: Startup Scheduler + +**Files:** +- Create: `src/functions/session-reaper-scheduler.ts` +- Modify: `src/index.ts` +- Test: `test/session-reaper-scheduler.test.ts` + +- [ ] **Step 1: Write failing scheduler tests** + +Add tests for: +- default schedules an unref'd startup timeout. +- `AGENTMEMORY_REAP_ENABLED=false` schedules nothing. +- `AGENTMEMORY_REAP_ON_STARTUP=false` disables only startup. +- unset or `0` `AGENTMEMORY_REAP_INTERVAL_MS` does not schedule periodic reaping. +- positive `AGENTMEMORY_REAP_INTERVAL_MS` schedules interval. +- `AGENTMEMORY_REAP_THRESHOLD_MS=60000` is passed as `{ thresholdMs: 60000 }` for startup and periodic payloads. +- invalid threshold env values fall back to default with a warning. +- trigger failures are caught and logged. +- `src/index.ts` imports/registers the reaper function, starts the scheduler, stores both handles, and clears timeout/interval during shutdown. + +- [ ] **Step 2: Run RED** + +Run: + +```bash +corepack pnpm exec vitest run test/session-reaper-scheduler.test.ts +``` + +Expected: fails because scheduler module does not exist. + +- [ ] **Step 3: Implement scheduler and wire index** + +Implement: + +```ts +export function startSessionReaper( + sdk: Pick, + log: LoggerLike, + env: Record, + setTimeoutFn = setTimeout, + setIntervalFn = setInterval, +): SessionReaperTimers | null; +``` + +Then import/register in `src/index.ts`: +- `registerSessionReaperFunction(sdk, kv)` in `registerAllFunctions`. +- `const sessionReaperTimers = startSessionReaper(...)` after eviction scheduler setup. +- Clear startup timeout and interval during shutdown. +- Pass the resolved threshold in scheduler trigger payloads. +- Update boot log endpoint count to 135. + +- [ ] **Step 4: Run GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/session-reaper-scheduler.test.ts +``` + +Expected: scheduler tests pass. + +## Task 3: REST API + +**Files:** +- Modify: `src/triggers/api.ts` +- Test: `test/api-boundary-coverage.test.ts` + +- [ ] **Step 1: Write failing API boundary tests** + +Add `api::session-reap` to protected handlers and success matrix. Add assertions: + +```ts +await expect(handler(req({ body: { threshold: "bad" } }))).resolves.toMatchObject({ status_code: 400 }); +await expect(handler(req({ body: { threshold: "4h", thresholdMs: 1000 } }))).resolves.toMatchObject({ status_code: 400 }); +await handler(req({ body: { dryRun: true, threshold: "4h", ignored: true } })); +expect(sdk.triggerCalls.at(-1)).toEqual({ + function_id: "mem::session-reap", + payload: { source: "api", dryRun: true, thresholdMs: 14_400_000 }, +}); +``` + +- [ ] **Step 2: Run RED** + +Run: + +```bash +corepack pnpm exec vitest run test/api-boundary-coverage.test.ts +``` + +Expected: fails because `api::session-reap` is not registered. + +- [ ] **Step 3: Implement API handler** + +Add `api::session-reap`: +- `POST /agentmemory/sessions/reap` +- `checkAuth(req, secret)` first. +- Accept only `dryRun`, `threshold`, and `thresholdMs` from body/query. +- Reject both `threshold` and `thresholdMs`. +- Trigger `mem::session-reap` with `{ source: "api", dryRun, thresholdMs }`. + +- [ ] **Step 4: Run GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/api-boundary-coverage.test.ts +``` + +Expected: API boundary coverage passes. + +## Task 4: CLI + +**Files:** +- Modify: `src/cli.ts` +- Test: `test/cli-http-auth.test.ts`, `test/session-end-triggers-graph.test.ts` + +- [ ] **Step 1: Write failing CLI tests** + +Add tests proving: +- `agentmemory reap --dry-run --threshold 4h` probes `/agentmemory/livez`. +- It posts `/agentmemory/sessions/reap`. +- It sends file-backed bearer auth without leaking the secret. +- It sends body `{ dryRun: true, thresholdMs: 14400000 }`. +- invalid threshold exits before POST. +- ambiguous `--threshold` plus `--threshold-ms` exits before `/livez` or POST. +- help text and command map include `reap`. + +- [ ] **Step 2: Run RED** + +Run: + +```bash +corepack pnpm exec vitest run test/cli-http-auth.test.ts test/session-end-triggers-graph.test.ts +``` + +Expected: fails because `reap` command is absent. + +- [ ] **Step 3: Implement `runReap`** + +Implement CLI parsing for: +- `--dry-run` +- `--threshold ` +- `--threshold=` +- `--threshold-ms ` +- `--threshold-ms=` + +Behavior: +- Probe `livez`; do not auto-start daemon. +- Use `buildJsonRequestHeaders`. +- POST normalized JSON to `/agentmemory/sessions/reap`. +- Print recovered/skipped/failed counts. +- Exit nonzero on invalid flags, HTTP errors, and non-JSON responses. + +- [ ] **Step 4: Run GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/cli-http-auth.test.ts test/session-end-triggers-graph.test.ts +``` + +Expected: CLI tests pass. + +## Task 5: Diagnostics And Heal + +**Files:** +- Modify: `src/functions/diagnostics.ts` +- Test: `test/diagnostics.test.ts` + +- [ ] **Step 1: Write failing diagnostics tests** + +Add/update tests for: +- active session idle beyond 4h produces `abandoned-session:` with `fixable: true`. +- old `startedAt` plus recent observation does not warn. +- exact threshold age warns because recovery uses `>=`. +- all-invalid timestamps do not crash diagnostics. +- fresh `updatedAt` sessions do not enumerate observations. +- `mem::heal` with `categories: ["sessions"]` triggers `mem::session-reap` with `{ source: "heal", dryRun }`. + +- [ ] **Step 2: Run RED** + +Run: + +```bash +corepack pnpm exec vitest run test/diagnostics.test.ts +``` + +Expected: fails on old hard-coded 24h session logic and missing heal branch. + +- [ ] **Step 3: Implement diagnostics changes** + +Use `listAbandonedSessionCandidates` for sessions category. In `mem::heal`, add a `sessions` branch that calls `mem::session-reap` and appends result details. + +- [ ] **Step 4: Run GREEN** + +Run: + +```bash +corepack pnpm exec vitest run test/diagnostics.test.ts +``` + +Expected: diagnostics tests pass. + +## Task 6: Docs And Counts + +**Files:** +- Modify: `README.md`, `AGENTS.md`, `.env.example`, `assets/tags/section-api.svg`, `assets/tags/light/section-api.svg`, generated plugin skill references + +- [ ] **Step 1: Update docs** + +Add: +- `agentmemory reap --threshold 4h` +- `agentmemory reap --dry-run` +- `AGENTMEMORY_REAP_THRESHOLD_MS=14400000` +- `AGENTMEMORY_REAP_ENABLED=true` +- `AGENTMEMORY_REAP_ON_STARTUP=true` +- `AGENTMEMORY_REAP_INTERVAL_MS=0` + +Update REST endpoint count from `134` to `135`. +Run `corepack pnpm run skills:gen` after adding `AGENTMEMORY_REAP_*` usage and the new endpoint so generated plugin references stay in sync. + +- [ ] **Step 2: Search stale references** + +Run: + +```bash +rg -n "previous endpoint count|AGENTMEMORY_REAP|sessions/reap|agentmemory reap" README.md AGENTS.md .env.example assets/tags plugin src test +``` + +Expected: no stale previous endpoint count; new reaper references are intentional. + +## Task 7: Integrated Verification And Cleanup + +**Files:** all touched files. + +- [ ] **Step 1: Run focused regression** + +Run: + +```bash +corepack pnpm exec vitest run test/session-reaper.test.ts test/session-reaper-scheduler.test.ts test/api-boundary-coverage.test.ts test/cli-http-auth.test.ts test/session-end-triggers-graph.test.ts test/diagnostics.test.ts +``` + +Expected: focused suite passes. + +Also run: + +```bash +corepack pnpm exec vitest run test/consistency.test.ts test/plugin-surface-contract.test.ts +corepack pnpm run skills:check +``` + +Expected: endpoint counts and generated plugin references match source. + +- [ ] **Step 2: Focused simplification pass** + +Inspect only the touched implementation files. Remove duplicated parsing branches, unused exports, or unnecessary comments while preserving API/auth/persistence/lifecycle behavior. + +- [ ] **Step 3: Run focused regression again** + +Run the same focused command. Expected: pass. + +- [ ] **Step 4: Run broader repo-native test** + +Run: + +```bash +corepack pnpm test +``` + +Expected: full non-integration suite passes. If pnpm hardening blocks, run `corepack pnpm install --frozen-lockfile --ignore-scripts` and retry per repo instructions. + +## Feature / Verification Matrix + +| Change | Verification method | Status | +| --- | --- | --- | +| Core reaper and threshold parsing | `test/session-reaper.test.ts` | Pending | +| Startup reconciliation | `test/session-reaper-scheduler.test.ts` | Pending | +| REST endpoint | `test/api-boundary-coverage.test.ts` | Pending | +| CLI command | `test/cli-http-auth.test.ts`, source contract test | Pending | +| Diagnostics/heal | `test/diagnostics.test.ts` | Pending | +| Docs/counts | `rg` stale-reference check, `test/consistency.test.ts`, `test/plugin-surface-contract.test.ts`, `corepack pnpm run skills:check` | Pending | +| Integrated behavior | Focused Vitest + generated-surface checks + full `corepack pnpm test` | Pending | + +## Security And PR Prep Notes + +Changed surfaces include REST, CLI, lifecycle/persistence behavior, diagnostics, and agent workflow docs. Before any commit or PR readiness claim, run the required review chain from `github-push-prepare`, including passive security review, simple-code cleanup, adversarial implementation review, and required scanner gates if staging/committing. diff --git a/docs/todos/2026-06-19-issue-298-abandoned-session-recovery/todo.md b/docs/todos/2026-06-19-issue-298-abandoned-session-recovery/todo.md new file mode 100644 index 00000000..ecb6c592 --- /dev/null +++ b/docs/todos/2026-06-19-issue-298-abandoned-session-recovery/todo.md @@ -0,0 +1,210 @@ +# Issue 298 Abandoned Session Recovery + +Task id: `2026-06-19-issue-298-abandoned-session-recovery` + +## Scope + +Implement GitHub issue +[#298](https://github.com/wbugitlab1/agentmemory/issues/298) in the dedicated +worktree `/Users/A1538552/.codex/worktrees/3a6d/agentmemory` on branch +`issue/298-abandoned-session-recovery`, targeting only `origin`. + +Parent batch record: +`/Users/A1538552/_projects/_tools/agentmemory/docs/todos/2026-06-19-issue-triage-batch-288-312/todo.md`. + +## Initial Evidence + +- Active instructions: root `AGENTS.md` and delegated issue instructions read. +- Git state before branch creation: detached HEAD at + `499b53fc4a0f58d6f7b2daf674a7943de023d75a`, matching local `origin/main`. +- Branch result: created and switched to + `issue/298-abandoned-session-recovery`. +- Issue validity: `gh issue view 298 --repo wbugitlab1/agentmemory --json ...` + shows open issue titled `feat(recovery): abandoned session detection and + backfill for abruptly terminated sessions`, updated 2026-06-15. +- Existing gap: `src/functions/evict.ts` recovers only during 30-day stale + deletion and then deletes sessions; `src/functions/diagnostics.ts` only warns + on active sessions older than 24 hours by `startedAt`; `src/cli.ts` has no + `reap` command; `src/index.ts` has no startup reconciliation for abandoned + sessions. + +## Sprint Contract + +Goal: add a bounded backend recovery path that detects active sessions whose +last activity is older than a configurable threshold, runs the existing +session-end/consolidation lifecycle, marks them completed with `endedAt`, and +exposes the recovery through CLI/API/startup/diagnostics. + +Scope: +- Add reusable abandoned-session detection and recovery logic using existing + `KV.sessions`, observation scopes, `event::session::stopped`, + `mem::consolidate-pipeline`, and `recordAudit`. +- Add `agentmemory reap` CLI support and a REST endpoint that whitelists inputs. +- Run startup/periodic reconciliation from the worker when enabled by env. +- Update `mem::diagnose` session checks to use the new threshold and actionable + abandoned-session wording. +- Add focused tests for recovery, API/CLI wiring, scheduler behavior, and + diagnostics. +- Update docs/config references for the new env vars and command. + +Non-goals: +- Do not add new persistence schemas, migrations, external services, or + dependencies. +- Do not change auth behavior, session IDs, tenant/project scoping, or existing + `session/end` API semantics. +- Do not implement Hermes JSONL import support here; that belongs to issue + #297. +- Do not build the viewer banner unless the backend slice is complete and the + change remains narrow. + +Acceptance criteria: +- Stale active sessions are detected by `updatedAt` when present, otherwise + latest observation timestamp, otherwise `startedAt`. +- Recovery skips active sessions newer than the threshold and supports dry-run. +- Recovery marks abandoned sessions `completed`, sets `endedAt`, runs + `event::session::stopped`, runs consolidation once when at least one session + recovered, and logs an audit entry. +- CLI `agentmemory reap --threshold=4h` calls the backend recovery endpoint and + reports recovered/skipped counts. +- Startup reconciliation can run once on boot and scheduled daemon mode can run + when configured. +- Diagnostics surface abandoned sessions with actionable recovery guidance. + +Intended verification: +- RED/GREEN targeted Vitest tests for the new recovery path. +- `corepack pnpm exec vitest run` on the focused changed test set. +- `corepack pnpm test` if the focused suite passes and runtime budget permits. +- Required security gates before commit/PR if implementation proceeds to push: + `gitleaks protect --staged --redact`; Semgrep/OSV as required by changed + surfaces or repo-local gate availability. + +Known boundaries: +- Full GitHub feature-loop remote writes are authorized by delegation only after + green verification and without Human Checkpoint triggers. +- Any schema, dependency, auth, or persistence model expansion stops for a + Human Checkpoint. +- This task touches backend runtime behavior and agent workflow docs, so final + verification must include security-sensitive surface review. + +## Arena Checklist + +- [x] Frame +- [x] Fan out +- [x] Cross-judge +- [x] Pick +- [x] Graft +- [x] Verify + +## Arena Synthesis + +Base: candidate 1 +(`/private/tmp/arena-issue298/candidate-1/design.md`). + +Reason: candidate 1 has the most coherent core recovery semantics: +`mem::session-reap`, `/agentmemory/sessions/reap`, keyed locking, durable +completion, `endedAt` set to last known activity, dry-run no-write behavior, +per-session audit details, single post-batch consolidation, and no MCP or +persistence-scope churn. + +Cross-judge: `/private/tmp/arena-issue298/judge/report.md` recommended +candidate 1 as base. Scores: candidate 1 = 22, candidate 2 = 22, candidate 3 = +19. Candidate 1 won on maintainable recovery shape; candidate 2's strongest +ideas are grafts. + +Grafts: +- From candidate 2: compute last activity as the newest valid timestamp across + `startedAt`, `updatedAt`, and observations, not `updatedAt` alone. +- From candidate 2: reject explicit ambiguous `threshold` plus `thresholdMs`; + fail invalid API/CLI input fast; let invalid env values fall back with a + warning. +- From candidate 2: keep CLI safe with `/livez` probe, existing + `buildJsonRequestHeaders`, compact output, and no daemon auto-start. +- From candidate 3: periodic reconciliation is opt-in only; startup + reconciliation can be default-on and backgrounded. +- From candidate 3: future timestamps are skipped/reported and never clamped. + +Rejected: +- New audit operation `"session_reap"`; use existing `"heal"` audit semantics + to avoid changing the audit contract. +- Default-on periodic reaper; this could close intentionally idle sessions. +- New MCP tool; requested surfaces are backend, CLI, startup, and diagnostics. +- Broad `api::session::end` refactor or `evict.ts` cleanup unless implementation + requires a narrow shared helper. +- `runConsolidation` / `--no-consolidation` controls; recovery should run the + expected consolidation path. +- Lifecycle order that leaves sessions active when `event::session::stopped` + fails. + +## Feature / Verification Matrix + +| Change | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Branch/worktree setup | Git commands | Done | `git switch -c issue/298-abandoned-session-recovery`; `git status -sb` clean on branch | +| Issue legitimacy | GitHub issue read + repo search | Done | Open issue #298; repo lacks `reap` and startup reconciliation | +| Arena synthesis | Candidate outputs + judge | Done | Base candidate 1 with grafts from candidates 2 and 3 | +| Recovery function | Targeted Vitest | Done | `corepack pnpm exec vitest run test/session-reaper.test.ts` passed; covers parsing, candidate detection, dry-run, lifecycle, audit, consolidation, and observation-lock race | +| CLI/API wiring | Targeted Vitest + source contract tests | Done | `test/api-boundary-coverage.test.ts`, `test/cli-http-auth.test.ts`, and `test/session-end-triggers-graph.test.ts` passed in focused suite; empty-body query fallback and missing CLI flag-value edge cases are covered | +| Diagnostics | Targeted Vitest | Done | `corepack pnpm exec vitest run test/diagnostics.test.ts` passed; sessions category and heal delegation covered | +| Startup scheduler | Targeted Vitest/source contract | Done | `corepack pnpm exec vitest run test/session-reaper-scheduler.test.ts` passed; env threshold payload, failure logging, and index cleanup wiring covered | +| Docs/counts | Generated checks | Done | `corepack pnpm run skills:gen`; stale-reference `rg`; `test/consistency.test.ts`; `test/plugin-surface-contract.test.ts`; `corepack pnpm run skills:check` all passed; after merging current `origin/main`, regenerated config reference count from 76 to 80 and `skills:check` passed | +| Final regression | Repo-native CI commands | Done | Pre-merge checks passed; after merging current `origin/main`, focused suite passed (6 files, 130 tests), `corepack pnpm run lint`, `corepack pnpm run build`, `corepack pnpm run coverage` (211 files, 2901 tests), `corepack pnpm test` (211 files, 2901 tests), and `corepack pnpm run skills:check` passed | +| Security gates | Semgrep + staged Gitleaks | Done | Pre-merge Semgrep and staged Gitleaks passed; post-merge `semgrep scan --config p/default --error --metrics=off .` passed with 0 findings; post-merge `gitleaks protect --staged --redact` passed with no leaks | + +## Subagent Ledger + +| Workstream | Scope | Edits allowed | Expected output | Result | Residual risk | +| --- | --- | --- | --- | --- | --- | +| Arena candidate 1 | Recovery design only | No | Candidate design and rationale | Done: base design selected | Synthesized with grafts; not copied verbatim | +| Arena candidate 2 | Recovery design only | No | Candidate design and rationale | Done: threshold/input-safety grafts accepted | Synthesized with base | +| Arena candidate 3 | Recovery design only | No | Candidate design and rationale | Done: opt-in periodic and future-timestamp grafts accepted | Synthesized with base | +| Arena judge | Candidate review only | No | Rubric scoring and base recommendation | Done: recommended candidate 1 base with candidate 2/3 grafts | Main agent retained final responsibility | +| Pre-code spec reviewer | Plan and task-state review | No | Missing acceptance criteria and race risks | Done: found shared-lock, scheduler threshold, and CLI ambiguity gaps; accepted into plan | Implementation must prove fixes with tests | +| Pre-code verification reviewer | Planned verification review | No | Missing verification surfaces | Done: found index wiring, generated docs/count, and threshold edge coverage gaps; accepted into plan | Full checks still required | +| Pre-code architecture reviewer | Architecture and iii-engine safety review | No | Boundary and lifecycle risks | Done: found endpoint-count surface and unbounded startup scan risks; accepted into plan | Legacy sessions without `updatedAt` still require bounded observation scans | + +## Progress Notes + +- 2026-06-19: Confirmed working directory, instructions, clean worktree, + branch, issue evidence, relevant README/scripts, and core session lifecycle + files before implementation edits. +- 2026-06-19: Completed arena: three read-only candidate designs plus one + read-only judge. Chose candidate 1 as base, grafted candidate 2 last-activity + and input-safety rules, and grafted candidate 3 opt-in periodic stance. +- 2026-06-19: Completed pre-code review. Accepted findings into + `plan.md`: recovery must use the existing `obs:` lock, scheduler + must pass threshold env payloads and warn on invalid env, CLI/API must reject + ambiguous threshold inputs, index wiring and generated count surfaces need + explicit tests, and startup scans must prefilter by session-level timestamps + before listing observations. +- 2026-06-19: Implemented backend recovery (`mem::session-reap`), startup and + optional interval scheduler, REST endpoint, CLI `reap`, diagnostics/heal + integration, docs/env references, endpoint-count surfaces, and generated + plugin references. No schema, dependency, MCP-tool, auth, or viewer-surface + expansion was added. +- 2026-06-19: Final review added API query fallback coverage for empty body + threshold fields and CLI missing-value validation for `--threshold`. +- 2026-06-19: Verification passed after the final source edit: API/CLI targeted + suite (2 files, 60 tests), focused reaper/API/CLI/scheduler/diagnostics suite + (6 files, 129 tests), `corepack pnpm run lint`, `corepack pnpm run build`, + full `corepack pnpm test` (209 files, 2855 tests), `corepack pnpm run + skills:check`, and `git diff --check`. +- 2026-06-19: Security gates passed: `semgrep scan --config p/default + --error --metrics=off .` with 0 findings, explicit Semgrep scan of the four + new TypeScript files with 0 findings, and `gitleaks protect --staged + --redact` with no leaks. +- 2026-06-19: Initial PR CI failed in `pnpm run skills:check` because GitHub + tested the synthetic merge commit against newer `origin/main`. Merged current + `origin/main` into the issue branch, regenerated `agentmemory-config` + reference count from 76 to 80 recognized env vars, and verified + `skills:check` passed. +- 2026-06-19: Post-merge verification passed: focused + reaper/API/CLI/scheduler/diagnostics suite (6 files, 130 tests), + `corepack pnpm run lint`, `corepack pnpm run build`, `corepack pnpm run + coverage` (211 files, 2901 tests), `corepack pnpm test` (211 files, 2901 + tests), and `corepack pnpm run skills:check`. +- 2026-06-19: Post-merge Semgrep passed: + `semgrep scan --config p/default --error --metrics=off .` scanned 937 tracked + files with 0 findings. +- 2026-06-19: Post-merge staged Gitleaks passed: + `gitleaks protect --staged --redact` scanned the staged generated-doc/task + record fix with no leaks. diff --git a/plugin/skills/agentmemory-config/REFERENCE.md b/plugin/skills/agentmemory-config/REFERENCE.md index 619d6216..22659a27 100644 --- a/plugin/skills/agentmemory-config/REFERENCE.md +++ b/plugin/skills/agentmemory-config/REFERENCE.md @@ -3,7 +3,7 @@ Generated by scanning `src/` for `AGENTMEMORY_*` usage. Do not edit the block below by hand; run `corepack pnpm run skills:gen` after adding or removing a variable. Internal markers ending in two underscores are excluded. -Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 76 recognized variables: +Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 80 recognized variables: - `AGENTMEMORY_AGENT_ID` - `AGENTMEMORY_AGENT_SCOPE` @@ -63,6 +63,10 @@ Configuration is read from the environment and from `~/.agentmemory/.env` (no `e - `AGENTMEMORY_PROJECT_NAME` - `AGENTMEMORY_PROVIDER` - `AGENTMEMORY_READY_TIMEOUT_MS` +- `AGENTMEMORY_REAP_ENABLED` +- `AGENTMEMORY_REAP_INTERVAL_MS` +- `AGENTMEMORY_REAP_ON_STARTUP` +- `AGENTMEMORY_REAP_THRESHOLD_MS` - `AGENTMEMORY_REFLECT` - `AGENTMEMORY_REMOTE_REQUIRED` - `AGENTMEMORY_REQUIRE_HTTPS` diff --git a/plugin/skills/agentmemory-rest-api/REFERENCE.md b/plugin/skills/agentmemory-rest-api/REFERENCE.md index 9014e17e..790166fc 100644 --- a/plugin/skills/agentmemory-rest-api/REFERENCE.md +++ b/plugin/skills/agentmemory-rest-api/REFERENCE.md @@ -5,7 +5,7 @@ Generated from `src/triggers/api.ts`. Do not edit the block below by hand; run ` The REST API is the primary surface. All paths are under `http://localhost:3111` (override with `--port`). When `AGENTMEMORY_SECRET` is set, send `Authorization: Bearer $AGENTMEMORY_SECRET`; localhost is otherwise open. -134 registered endpoints: +135 registered endpoints: | Method | Path | | --- | --- | @@ -115,6 +115,7 @@ The REST API is the primary surface. All paths are under `http://localhost:3111` | POST | `/agentmemory/session/end` | | POST | `/agentmemory/session/start` | | GET | `/agentmemory/sessions` | +| POST | `/agentmemory/sessions/reap` | | GET | `/agentmemory/signals` | | POST | `/agentmemory/signals/send` | | GET | `/agentmemory/sketches` | diff --git a/src/cli.ts b/src/cli.ts index 10cf2ff2..1b8c5cf8 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -85,6 +85,7 @@ import { } from "./cli/readiness-timeout.js"; import { renderPreparedRuntimeIiiConfig } from "./cli/runtime-config.js"; import { materializeUserIiiConfig } from "./cli/iii-config.js"; +import { resolveSessionReapThresholdMs } from "./functions/session-reaper.js"; const ALL_TOOLS_COUNT = getAllTools().length; const CORE_TOOLS_COUNT = getAllTools().filter((t) => ESSENTIAL_TOOLS.has(t.name)).length; @@ -184,6 +185,8 @@ Commands: import-jsonl [p] Import Claude Code JSONL transcripts (default: ~/.claude/projects) --max-files | --max-files=: override scan cap (default 200, max 1000; out-of-range is rejected; for trees >1000 files, batch by subdirectory) + reap [--dry-run] Recover abandoned active sessions. + agentmemory reap --threshold 4h Options: --help, -h Show this help @@ -3079,6 +3082,129 @@ async function runImportJsonl(): Promise { } } +async function runReap(): Promise { + let dryRun = false; + let threshold: unknown; + let thresholdMs: unknown; + let flagError: string | null = null; + const tail = args.slice(1); + + for (let i = 0; i < tail.length; i++) { + const arg = tail[i]!; + if (arg === "--dry-run") { + dryRun = true; + continue; + } + if (arg === "--threshold") { + const value = tail[i + 1]; + if (value === undefined || value.startsWith("--")) { + flagError = "--threshold requires a value"; + break; + } + threshold = value; + i++; + continue; + } + if (arg.startsWith("--threshold=")) { + threshold = arg.slice("--threshold=".length); + continue; + } + if (arg === "--threshold-ms") { + const value = tail[i + 1]; + if (value === undefined || value.startsWith("--")) { + flagError = "--threshold-ms requires a value"; + break; + } + thresholdMs = value; + i++; + continue; + } + if (arg.startsWith("--threshold-ms=")) { + thresholdMs = arg.slice("--threshold-ms=".length); + continue; + } + } + + if (flagError) { + p.log.error(flagError); + process.exit(1); + } + + const parsed = resolveSessionReapThresholdMs({ threshold, thresholdMs }, {}); + if (!parsed.ok) { + p.log.error(parsed.error); + process.exit(1); + } + + const base = getBaseUrl(); + let probeOk = false; + let probeDetail = ""; + try { + const probe = await fetch(`${base}/agentmemory/livez`, { + signal: AbortSignal.timeout(2000), + }); + probeOk = probe.ok; + if (!probeOk) { + const body = await probe.text().catch(() => ""); + probeDetail = `reachable but unhealthy (HTTP ${probe.status}${body ? `: ${body.slice(0, 200)}` : ""})`; + } + } catch (err) { + probeDetail = `unreachable (${err instanceof Error ? err.message : String(err)})`; + } + if (!probeOk) { + p.log.error( + `agentmemory livez probe failed at ${base}: ${probeDetail}. Start it with \`npx @agentmemory/agentmemory\` in another terminal, then re-run this command.`, + ); + process.exit(1); + } + + const url = `${base}/agentmemory/sessions/reap`; + const headers = buildJsonRequestHeaders(url); + if (!headers.ok) { + p.log.error(headers.message); + process.exit(1); + } + + try { + const res = await fetch(url, { + method: "POST", + headers: headers.headers, + body: JSON.stringify({ dryRun, thresholdMs: parsed.valueMs }), + signal: AbortSignal.timeout(120_000), + }); + const text = await res.text(); + let json: { + success?: boolean; + error?: string; + eligible?: number; + recovered?: number; + skipped?: number; + failed?: number; + } = {}; + if (text.length > 0) { + try { + json = JSON.parse(text); + } catch { + p.log.error( + `server returned non-JSON response (HTTP ${res.status}): ${text.slice(0, 200)}`, + ); + process.exit(1); + } + } + if (!res.ok || json.success === false) { + p.log.error(json.error || `reap failed with HTTP ${res.status}`); + process.exit(1); + } + + console.log( + `reap: eligible=${json.eligible ?? 0} recovered=${json.recovered ?? 0} skipped=${json.skipped ?? 0} failed=${json.failed ?? 0}`, + ); + } catch (err) { + p.log.error(err instanceof Error ? err.message : String(err)); + process.exit(1); + } +} + // --------------------------------------------------------------------------- // `agentmemory remove` — clean uninstall. // @@ -3232,6 +3358,7 @@ const commands: Record Promise> = { remove: runRemove, mcp: runMcp, "import-jsonl": runImportJsonl, + reap: runReap, }; const handler = commands[args[0] ?? ""] ?? main; diff --git a/src/functions/diagnostics.ts b/src/functions/diagnostics.ts index cc982883..868ca632 100644 --- a/src/functions/diagnostics.ts +++ b/src/functions/diagnostics.ts @@ -3,6 +3,10 @@ import type { StateKV } from "../state/kv.js"; import { KV } from "../state/schema.js"; import { withKeyedLock } from "../state/keyed-mutex.js"; import { recordAudit } from "./audit.js"; +import { + DEFAULT_SESSION_REAP_THRESHOLD_MS, + listAbandonedSessionCandidates, +} from "./session-reaper.js"; import type { Action, ActionEdge, @@ -40,7 +44,6 @@ const ALL_CATEGORIES = [ "mesh", ]; -const TWENTY_FOUR_HOURS_MS = 24 * 60 * 60 * 1000; const ONE_HOUR_MS = 60 * 60 * 1000; export function registerDiagnosticsFunction(sdk: ISdk, kv: StateKV): void { @@ -289,25 +292,22 @@ export function registerDiagnosticsFunction(sdk: ISdk, kv: StateKV): void { if (categories.includes("sessions")) { const sessions = await kv.list(KV.sessions); - let sessionIssues = 0; + const candidates = await listAbandonedSessionCandidates(kv, { + thresholdMs: DEFAULT_SESSION_REAP_THRESHOLD_MS, + now: new Date(now), + }); - for (const session of sessions) { - if ( - session.status === "active" && - now - new Date(session.startedAt).getTime() > TWENTY_FOUR_HOURS_MS - ) { - checks.push({ - name: `abandoned-session:${session.id}`, - category: "sessions", - status: "warn", - message: `Session ${session.id} has been active for over 24 hours`, - fixable: false, - }); - sessionIssues++; - } + for (const candidate of candidates) { + checks.push({ + name: `abandoned-session:${candidate.sessionId}`, + category: "sessions", + status: "warn", + message: `Session ${candidate.sessionId} has been inactive for at least 4 hours; run agentmemory reap --threshold 4h`, + fixable: true, + }); } - if (sessionIssues === 0) { + if (candidates.length === 0) { checks.push({ name: "sessions-ok", category: "sessions", @@ -1008,6 +1008,26 @@ export function registerDiagnosticsFunction(sdk: ISdk, kv: StateKV): void { } } + if (categories.includes("sessions")) { + const result = await sdk.trigger({ + function_id: "mem::session-reap", + payload: { source: "heal", dryRun }, + }) as { + eligible?: number; + recovered?: number; + skipped?: number; + failed?: number; + }; + const recoverable = dryRun ? result.eligible ?? 0 : result.recovered ?? 0; + fixed += recoverable; + skipped += (result.skipped ?? 0) + (result.failed ?? 0); + details.push( + dryRun + ? `[dry-run] Would recover ${recoverable} abandoned session(s)` + : `Recovered ${recoverable} abandoned session(s)`, + ); + } + if (categories.includes("memories")) { const memories = await kv.list(KV.memories); const supersededBy = new Map(); diff --git a/src/functions/session-reaper-scheduler.ts b/src/functions/session-reaper-scheduler.ts new file mode 100644 index 00000000..9fa62ec8 --- /dev/null +++ b/src/functions/session-reaper-scheduler.ts @@ -0,0 +1,85 @@ +import type { ISdk } from "iii-sdk"; +import { + DEFAULT_SESSION_REAP_THRESHOLD_MS, + resolveSessionReapThresholdMs, +} from "./session-reaper.js"; + +export { DEFAULT_SESSION_REAP_THRESHOLD_MS }; + +type StartupTimerHandle = ReturnType; +type IntervalTimerHandle = ReturnType; +type LoggerLike = { + warn: (message: string, meta?: Record) => void; +}; +type SetTimeoutFn = ( + callback: () => Promise, + delayMs: number, +) => StartupTimerHandle; +type SetIntervalFn = ( + callback: () => Promise, + intervalMs: number, +) => IntervalTimerHandle; + +export interface SessionReaperTimers { + startupTimer?: StartupTimerHandle; + intervalTimer?: IntervalTimerHandle; +} + +function parseIntervalMs(value: string | undefined): number | null { + if (value === undefined || value.trim().length === 0) return null; + const parsed = Number.parseInt(value, 10); + if (!Number.isSafeInteger(parsed) || parsed < 0) return null; + return parsed > 0 ? parsed : null; +} + +export function startSessionReaper( + sdk: Pick, + log: LoggerLike, + env: Record, + setTimeoutFn: SetTimeoutFn = setTimeout, + setIntervalFn: SetIntervalFn = setInterval, +): SessionReaperTimers | null { + if (env["AGENTMEMORY_REAP_ENABLED"] === "false") return null; + + const threshold = resolveSessionReapThresholdMs({}, env); + const thresholdMs = threshold.ok + ? threshold.valueMs + : DEFAULT_SESSION_REAP_THRESHOLD_MS; + if (threshold.ok && threshold.warning) { + log.warn("Invalid session reaper threshold env; using default", { + error: threshold.warning, + }); + } else if (!threshold.ok) { + log.warn("Invalid session reaper threshold env; using default", { + error: threshold.error, + }); + } + + const run = async (source: "startup" | "scheduler") => { + try { + await sdk.trigger({ + function_id: "mem::session-reap", + payload: { dryRun: false, source, thresholdMs }, + }); + } catch (err) { + log.warn("Session reaper failed", { + source, + error: err instanceof Error ? err.message : String(err), + }); + } + }; + + const timers: SessionReaperTimers = {}; + if (env["AGENTMEMORY_REAP_ON_STARTUP"] !== "false") { + timers.startupTimer = setTimeoutFn(() => run("startup"), 0); + timers.startupTimer.unref?.(); + } + + const intervalMs = parseIntervalMs(env["AGENTMEMORY_REAP_INTERVAL_MS"]); + if (intervalMs !== null) { + timers.intervalTimer = setIntervalFn(() => run("scheduler"), intervalMs); + timers.intervalTimer.unref?.(); + } + + return timers.startupTimer || timers.intervalTimer ? timers : null; +} diff --git a/src/functions/session-reaper.ts b/src/functions/session-reaper.ts new file mode 100644 index 00000000..6380bb04 --- /dev/null +++ b/src/functions/session-reaper.ts @@ -0,0 +1,568 @@ +import type { ISdk } from "iii-sdk"; +import type { StateKV } from "../state/kv.js"; +import { KV } from "../state/schema.js"; +import { withKeyedLock } from "../state/keyed-mutex.js"; +import type { CompressedObservation, RawObservation, Session } from "../types.js"; +import { recordAudit } from "./audit.js"; +import { logger } from "../logger.js"; + +export const DEFAULT_SESSION_REAP_THRESHOLD_MS = 4 * 60 * 60 * 1000; + +export type ThresholdParseResult = + | { ok: true; valueMs: number; warning?: string } + | { ok: false; error: string }; + +export type LastActivitySource = "updatedAt" | "startedAt" | "observation"; + +export type LastActivityResult = + | { + ok: true; + at: string; + atMs: number; + source: LastActivitySource; + } + | { + ok: false; + reason: + | "future_last_activity" + | "no_valid_activity" + | "observation_scan_failed"; + at?: string; + atMs?: number; + source?: LastActivitySource; + error?: string; + }; + +export interface AbandonedSessionCandidate { + sessionId: string; + session: Session; + lastActivityAt: string; + lastActivityMs: number; + lastActivitySource: LastActivitySource; + ageMs: number; +} + +interface SkippedSession { + sessionId: string; + status: "skipped"; + reason: + | "future_last_activity" + | "no_valid_activity" + | "observation_scan_failed" + | "session_recent_activity" + | "session_not_active"; + lastActivityAt?: string; + lastActivitySource?: LastActivitySource; + error?: string; +} + +interface SessionReapPayload { + dryRun?: unknown; + threshold?: unknown; + thresholdMs?: unknown; + source?: unknown; +} + +interface SessionReapEntry { + sessionId: string; + status: "eligible" | "recovered" | "skipped" | "failed"; + reason?: string; + lastActivityAt?: string; + lastActivitySource?: LastActivitySource; + pipelineOk?: boolean; + error?: string; +} + +const THRESHOLD_RE = /^(\d+)(ms|s|m|h|d)?$/i; + +function isObject(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function parsePositiveIntegerMs(value: number): ThresholdParseResult { + if (!Number.isSafeInteger(value) || value <= 0) { + return { ok: false, error: "threshold must be a positive integer number of milliseconds" }; + } + return { ok: true, valueMs: value }; +} + +export function parseSessionReapThreshold(raw: unknown): ThresholdParseResult { + if (typeof raw === "number") return parsePositiveIntegerMs(raw); + + if (typeof raw !== "string") { + return { ok: false, error: "threshold must be a positive integer or duration string" }; + } + + const value = raw.trim(); + if (!value) { + return { ok: false, error: "threshold must not be empty" }; + } + + const match = THRESHOLD_RE.exec(value); + if (!match) { + return { + ok: false, + error: "threshold must be milliseconds or a duration like 4h, 30m, 60s, or 250ms", + }; + } + + const amount = Number.parseInt(match[1]!, 10); + const unit = (match[2] ?? "ms").toLowerCase(); + const multiplier = + unit === "d" + ? 86_400_000 + : unit === "h" + ? 3_600_000 + : unit === "m" + ? 60_000 + : unit === "s" + ? 1_000 + : 1; + return parsePositiveIntegerMs(amount * multiplier); +} + +export function resolveSessionReapThresholdMs( + input: { threshold?: unknown; thresholdMs?: unknown } = {}, + env: Record = process.env, +): ThresholdParseResult { + if (input.threshold !== undefined && input.thresholdMs !== undefined) { + return { ok: false, error: "Use either threshold or thresholdMs, not both" }; + } + + if (input.thresholdMs !== undefined) return parseSessionReapThreshold(input.thresholdMs); + if (input.threshold !== undefined) return parseSessionReapThreshold(input.threshold); + + const envValue = env["AGENTMEMORY_REAP_THRESHOLD_MS"]; + if (envValue !== undefined && envValue.trim().length > 0) { + const parsed = parseSessionReapThreshold(envValue); + if (parsed.ok) return parsed; + return { + ok: true, + valueMs: DEFAULT_SESSION_REAP_THRESHOLD_MS, + warning: parsed.error, + }; + } + + return { ok: true, valueMs: DEFAULT_SESSION_REAP_THRESHOLD_MS }; +} + +function validTimestamp(value: unknown): number | null { + if (typeof value !== "string" || value.trim().length === 0) return null; + const ms = new Date(value).getTime(); + return Number.isFinite(ms) ? ms : null; +} + +function toIso(ms: number): string { + return new Date(ms).toISOString(); +} + +function sessionUpdatedAt(session: Session): unknown { + return (session as Session & { updatedAt?: unknown }).updatedAt; +} + +function pickNewer( + current: { atMs: number; source: LastActivitySource } | null, + candidate: { atMs: number; source: LastActivitySource } | null, +): { atMs: number; source: LastActivitySource } | null { + if (!candidate) return current; + if (!current || candidate.atMs > current.atMs) return candidate; + return current; +} + +function getSessionLevelActivity(session: Session): { + atMs: number; + source: "updatedAt" | "startedAt"; +} | null { + let best: { atMs: number; source: "updatedAt" | "startedAt" } | null = null; + const updatedAtMs = validTimestamp(sessionUpdatedAt(session)); + if (updatedAtMs !== null) best = { atMs: updatedAtMs, source: "updatedAt" }; + + const startedAtMs = validTimestamp(session.startedAt); + if (startedAtMs !== null) { + best = pickNewer(best, { atMs: startedAtMs, source: "startedAt" }) as { + atMs: number; + source: "updatedAt" | "startedAt"; + } | null; + } + + return best; +} + +async function listObservationActivity( + kv: StateKV, + sessionId: string, +): Promise { + let observations: Array; + try { + observations = await kv.list( + KV.observations(sessionId), + ); + } catch (err) { + return { + ok: false, + reason: "observation_scan_failed", + error: err instanceof Error ? err.message : String(err), + }; + } + + let best: { atMs: number; source: "observation" } | null = null; + for (const observation of observations) { + const timestampMs = validTimestamp(observation.timestamp); + if (timestampMs === null) continue; + best = pickNewer(best, { + atMs: timestampMs, + source: "observation", + }) as { atMs: number; source: "observation" }; + } + + if (!best) return { ok: false, reason: "no_valid_activity" }; + return { + ok: true, + at: toIso(best.atMs), + atMs: best.atMs, + source: "observation", + }; +} + +export async function getSessionLastActivity( + kv: StateKV, + session: Session, +): Promise { + let best = getSessionLevelActivity(session); + const observationActivity = await listObservationActivity(kv, session.id); + if (observationActivity.ok) { + best = pickNewer(best, { + atMs: observationActivity.atMs, + source: observationActivity.source, + }); + } else if (observationActivity.reason === "observation_scan_failed") { + return observationActivity; + } + + if (!best) return { ok: false, reason: "no_valid_activity" }; + return { + ok: true, + at: toIso(best.atMs), + atMs: best.atMs, + source: best.source, + }; +} + +async function getThresholdActivity( + kv: StateKV, + session: Session, + thresholdMs: number, + nowMs: number, +): Promise { + const sessionLevel = getSessionLevelActivity(session); + if (sessionLevel) { + if (sessionLevel.atMs > nowMs) { + return { + ok: false, + reason: "future_last_activity", + at: toIso(sessionLevel.atMs), + atMs: sessionLevel.atMs, + source: sessionLevel.source, + }; + } + if (nowMs - sessionLevel.atMs < thresholdMs) { + return { + ok: true, + at: toIso(sessionLevel.atMs), + atMs: sessionLevel.atMs, + source: sessionLevel.source, + }; + } + } + + const observationActivity = await listObservationActivity(kv, session.id); + if (!observationActivity.ok) { + if (observationActivity.reason === "no_valid_activity" && sessionLevel) { + return { + ok: true, + at: toIso(sessionLevel.atMs), + atMs: sessionLevel.atMs, + source: sessionLevel.source, + }; + } + return observationActivity; + } + + const best = pickNewer(sessionLevel, { + atMs: observationActivity.atMs, + source: observationActivity.source, + }); + if (!best) return { ok: false, reason: "no_valid_activity" }; + if (best.atMs > nowMs) { + return { + ok: false, + reason: "future_last_activity", + at: toIso(best.atMs), + atMs: best.atMs, + source: best.source, + }; + } + return { + ok: true, + at: toIso(best.atMs), + atMs: best.atMs, + source: best.source, + }; +} + +async function scanAbandonedSessions( + kv: StateKV, + opts: { thresholdMs: number; now?: Date }, +): Promise<{ candidates: AbandonedSessionCandidate[]; skipped: SkippedSession[] }> { + const nowMs = opts.now?.getTime() ?? Date.now(); + const sessions = await kv.list(KV.sessions); + const candidates: AbandonedSessionCandidate[] = []; + const skipped: SkippedSession[] = []; + + for (const session of sessions) { + if (session.status !== "active") continue; + + const activity = await getThresholdActivity( + kv, + session, + opts.thresholdMs, + nowMs, + ); + if (!activity.ok) { + skipped.push({ + sessionId: session.id, + status: "skipped", + reason: activity.reason, + lastActivityAt: activity.at, + lastActivitySource: activity.source, + error: activity.error, + }); + continue; + } + + const ageMs = nowMs - activity.atMs; + if (ageMs >= opts.thresholdMs) { + candidates.push({ + sessionId: session.id, + session, + lastActivityAt: activity.at, + lastActivityMs: activity.atMs, + lastActivitySource: activity.source, + ageMs, + }); + } + } + + return { candidates, skipped }; +} + +export async function listAbandonedSessionCandidates( + kv: StateKV, + opts: { thresholdMs: number; now?: Date }, +): Promise { + const scan = await scanAbandonedSessions(kv, opts); + return scan.candidates; +} + +function getPipelineError(result: unknown): string | null { + if (!result || typeof result !== "object") return null; + if ((result as { success?: unknown }).success !== false) return null; + const error = (result as { error?: unknown }).error; + return typeof error === "string" ? error : JSON.stringify(result); +} + +async function runStoppedLifecycle( + sdk: ISdk, + sessionId: string, +): Promise<{ ok: true } | { ok: false; error: string }> { + try { + const result = await sdk.trigger({ + function_id: "event::session::stopped", + payload: { sessionId }, + }); + const failure = getPipelineError(result); + return failure ? { ok: false, error: failure } : { ok: true }; + } catch (err) { + return { + ok: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +async function runPostRecoveryConsolidation( + sdk: ISdk, +): Promise<{ attempted: true; ok: true } | { attempted: true; ok: false; error: string }> { + try { + const result = await sdk.trigger({ + function_id: "mem::consolidate-pipeline", + payload: { tier: "all" }, + }); + const failure = getPipelineError(result); + return failure + ? { attempted: true, ok: false, error: failure } + : { attempted: true, ok: true }; + } catch (err) { + return { + attempted: true, + ok: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +export function registerSessionReaperFunction(sdk: ISdk, kv: StateKV): void { + sdk.registerFunction( + "mem::session-reap", + async (data: SessionReapPayload = {}) => { + if (!isObject(data)) { + return { success: false, error: "payload must be an object" }; + } + + if (data.dryRun !== undefined && typeof data.dryRun !== "boolean") { + return { success: false, error: "dryRun must be a boolean" }; + } + + const threshold = resolveSessionReapThresholdMs({ + threshold: data.threshold, + thresholdMs: data.thresholdMs, + }); + if (!threshold.ok) return { success: false, error: threshold.error }; + if (threshold.warning) { + logger.warn("Invalid session reaper threshold env; using default", { + error: threshold.warning, + }); + } + + const dryRun = data.dryRun === true; + const source = typeof data.source === "string" ? data.source : "manual"; + const now = new Date(); + const scan = await scanAbandonedSessions(kv, { + thresholdMs: threshold.valueMs, + now, + }); + const sessions: SessionReapEntry[] = [ + ...scan.skipped.map((entry) => ({ ...entry })), + ]; + + if (dryRun) { + sessions.push( + ...scan.candidates.map((candidate) => ({ + sessionId: candidate.sessionId, + status: "eligible" as const, + reason: "abandoned-session-reap", + lastActivityAt: candidate.lastActivityAt, + lastActivitySource: candidate.lastActivitySource, + })), + ); + return { + success: true, + dryRun, + thresholdMs: threshold.valueMs, + scanned: scan.candidates.length + scan.skipped.length, + eligible: scan.candidates.length, + recovered: 0, + skipped: scan.skipped.length, + failed: 0, + sessions, + }; + } + + let recovered = 0; + let failed = 0; + + for (const candidate of scan.candidates) { + const entry = await withKeyedLock(`obs:${candidate.sessionId}`, async () => { + const fresh = await kv.get(KV.sessions, candidate.sessionId); + if (!fresh || fresh.status !== "active") { + return { + sessionId: candidate.sessionId, + status: "skipped" as const, + reason: "session_not_active", + }; + } + + const activity = await getThresholdActivity( + kv, + fresh, + threshold.valueMs, + now.getTime(), + ); + if (!activity.ok) { + return { + sessionId: fresh.id, + status: "skipped" as const, + reason: activity.reason, + lastActivityAt: activity.at, + lastActivitySource: activity.source, + error: activity.error, + }; + } + + const ageMs = now.getTime() - activity.atMs; + if (ageMs < threshold.valueMs) { + return { + sessionId: fresh.id, + status: "skipped" as const, + reason: "session_recent_activity", + lastActivityAt: activity.at, + lastActivitySource: activity.source, + }; + } + + const completed: Session = { + ...fresh, + status: "completed", + endedAt: activity.at, + }; + await kv.set(KV.sessions, fresh.id, completed); + const pipeline = await runStoppedLifecycle(sdk, fresh.id); + await recordAudit(kv, "heal", "mem::session-reap", [fresh.id], { + reason: "abandoned-session-reap", + source, + previousStatus: "active", + newStatus: "completed", + endedAt: activity.at, + lastActivitySource: activity.source, + thresholdMs: threshold.valueMs, + pipelineOk: pipeline.ok, + ...(pipeline.ok ? {} : { pipelineError: pipeline.error }), + }); + + return { + sessionId: fresh.id, + status: "recovered" as const, + reason: "abandoned-session-reap", + lastActivityAt: activity.at, + lastActivitySource: activity.source, + pipelineOk: pipeline.ok, + ...(pipeline.ok ? {} : { error: pipeline.error }), + }; + }); + + sessions.push(entry); + if (entry.status === "recovered") { + recovered++; + if (entry.pipelineOk === false) failed++; + } + } + + const consolidation = + recovered > 0 ? await runPostRecoveryConsolidation(sdk) : undefined; + if (consolidation && !consolidation.ok) failed++; + + return { + success: true, + dryRun, + thresholdMs: threshold.valueMs, + scanned: scan.candidates.length + scan.skipped.length, + eligible: scan.candidates.length, + recovered, + skipped: sessions.filter((entry) => entry.status === "skipped").length, + failed, + sessions, + ...(consolidation ? { consolidation } : {}), + }; + }, + ); +} diff --git a/src/index.ts b/src/index.ts index b4569e8c..6d838bca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -57,6 +57,8 @@ import { getEvictSweepIntervalMs, startEvictSweep, } from "./functions/evict-scheduler.js"; +import { registerSessionReaperFunction } from "./functions/session-reaper.js"; +import { startSessionReaper } from "./functions/session-reaper-scheduler.js"; import { registerRelationsFunction } from "./functions/relations.js"; import { registerTimelineFunction } from "./functions/timeline.js"; import { registerLineageFunction } from "./functions/lineage.js"; @@ -369,6 +371,7 @@ async function main() { registerPatternsFunction(sdk, kv); registerRememberFunction(sdk, kv); registerEvictFunction(sdk, kv); + registerSessionReaperFunction(sdk, kv); registerRelationsFunction(sdk, kv); registerTimelineFunction(sdk, kv); registerLineageFunction(sdk, kv); @@ -580,7 +583,7 @@ async function main() { `Ready. ${embeddingProvider ? "Triple-stream (BM25+Vector+Graph)" : "BM25+Graph"} search active.`, ); bootLog( - `REST API: 134 endpoints at http://localhost:${config.restPort}/agentmemory/*`, + `REST API: 135 endpoints at http://localhost:${config.restPort}/agentmemory/*`, ); bootLog( `MCP surface (opt-in via \`npx @agentmemory/mcp\`): ${getAllTools().length} tools · 6 resources · 3 prompts`, @@ -618,6 +621,19 @@ async function main() { ); } + const sessionReaperTimers = startSessionReaper(sdk, logger, { + AGENTMEMORY_REAP_ENABLED: getEnvVar("AGENTMEMORY_REAP_ENABLED"), + AGENTMEMORY_REAP_ON_STARTUP: getEnvVar("AGENTMEMORY_REAP_ON_STARTUP"), + AGENTMEMORY_REAP_INTERVAL_MS: getEnvVar("AGENTMEMORY_REAP_INTERVAL_MS"), + AGENTMEMORY_REAP_THRESHOLD_MS: getEnvVar("AGENTMEMORY_REAP_THRESHOLD_MS"), + }); + if (sessionReaperTimers?.startupTimer) { + bootLog("Session reaper: startup reconciliation enabled"); + } + if (sessionReaperTimers?.intervalTimer) { + bootLog("Session reaper: periodic reconciliation enabled"); + } + if (process.env.LESSON_DECAY_ENABLED !== "false") { const lessonDecayTimer = setInterval(async () => { try { @@ -678,6 +694,8 @@ async function main() { dedupMap.stop(); indexPersistence.stop(); if (evictSweepTimer) clearInterval(evictSweepTimer); + if (sessionReaperTimers?.startupTimer) clearTimeout(sessionReaperTimers.startupTimer); + if (sessionReaperTimers?.intervalTimer) clearInterval(sessionReaperTimers.intervalTimer); await new Promise((resolve) => viewerServer.close(() => resolve())); await indexPersistence.save().catch((err) => { console.warn(`[agentmemory] Failed to save index on shutdown:`, err); diff --git a/src/triggers/api.ts b/src/triggers/api.ts index c3798b65..32762edb 100644 --- a/src/triggers/api.ts +++ b/src/triggers/api.ts @@ -16,6 +16,7 @@ import { getBoundViewerPort, getViewerSkipped } from "../viewer/server.js"; import { MAX_FILES_UPPER_BOUND } from "../functions/replay.js"; import { normalizeSessionMetadata } from "../functions/session-metadata.js"; import { normalizePositiveLimit } from "../functions/limits.js"; +import { resolveSessionReapThresholdMs } from "../functions/session-reaper.js"; import { logger } from "../logger.js"; import { filterSessionsByTime, @@ -1393,6 +1394,58 @@ export function registerApiTriggers( config: { api_path: "/agentmemory/evict", http_method: "POST" }, }); + sdk.registerFunction("api::session-reap", + async ( + req: ApiRequest<{ dryRun?: unknown; threshold?: unknown; thresholdMs?: unknown }>, + ): Promise => { + const authErr = checkAuth(req, secret); + if (authErr) return authErr; + + if (req.body?.dryRun !== undefined && typeof req.body.dryRun !== "boolean") { + return { + status_code: 400, + body: { error: "dryRun must be a boolean" }, + }; + } + + const normalizeParam = (value: unknown): unknown | undefined => { + if (value === undefined || value === null) return undefined; + if (typeof value === "string" && value.trim().length === 0) return undefined; + return value; + }; + const bodyThreshold = normalizeParam(req.body?.threshold); + const queryThreshold = normalizeParam(req.query_params?.["threshold"]); + const threshold = bodyThreshold !== undefined ? bodyThreshold : queryThreshold; + const bodyThresholdMs = normalizeParam(req.body?.thresholdMs); + const queryThresholdMs = normalizeParam(req.query_params?.["thresholdMs"]); + const thresholdMs = + bodyThresholdMs !== undefined ? bodyThresholdMs : queryThresholdMs; + const parsed = resolveSessionReapThresholdMs( + { threshold, thresholdMs }, + {}, + ); + if (!parsed.ok) { + return { + status_code: 400, + body: { error: parsed.error }, + }; + } + + const dryRun = + req.query_params?.["dryRun"] === "true" || req.body?.dryRun === true; + const result = await sdk.trigger({ + function_id: "mem::session-reap", + payload: { source: "api", dryRun, thresholdMs: parsed.valueMs }, + }); + return { status_code: 200, body: result }; + }, + ); + sdk.registerTrigger({ + type: "http", + function_id: "api::session-reap", + config: { api_path: "/agentmemory/sessions/reap", http_method: "POST" }, + }); + sdk.registerFunction("api::smart-search", async ( req: ApiRequest<{ diff --git a/test/api-boundary-coverage.test.ts b/test/api-boundary-coverage.test.ts index e13c358f..eeff6766 100644 --- a/test/api-boundary-coverage.test.ts +++ b/test/api-boundary-coverage.test.ts @@ -356,6 +356,7 @@ describe("REST API boundary coverage", () => { "api::generate-rules", "api::migrate", "api::evict", + "api::session-reap", "api::smart-search", "api::query", "api::diagnostic-followup", @@ -569,6 +570,30 @@ describe("REST API boundary coverage", () => { payload: { operation: "delete", limit: 3 }, }); + const sessionReap = sdk.getFunction("api::session-reap")!; + await expect(sessionReap(req({ body: { threshold: "bad" } }))).resolves.toMatchObject({ + status_code: 400, + }); + await expect(sessionReap(req({ body: { threshold: "4h", thresholdMs: 1000 } }))).resolves.toMatchObject({ + status_code: 400, + }); + await sessionReap(req({ + body: { dryRun: true, threshold: "4h", ignored: true }, + query: { thresholdMs: "", ignored: "true" }, + })); + expect(sdk.triggerCalls.at(-1)).toEqual({ + function_id: "mem::session-reap", + payload: { source: "api", dryRun: true, thresholdMs: 14_400_000 }, + }); + await sessionReap(req({ + body: { threshold: "", dryRun: true }, + query: { threshold: "30m" }, + })); + expect(sdk.triggerCalls.at(-1)).toEqual({ + function_id: "mem::session-reap", + payload: { source: "api", dryRun: true, thresholdMs: 1_800_000 }, + }); + await expect(search(req({ body: { query: "x", start_time: "not-a-date" }, }))).resolves.toMatchObject({ @@ -730,6 +755,7 @@ describe("REST API boundary coverage", () => { ["api::generate-rules", req({ body: { project: "git:repo" } }), 200], ["api::migrate", req({ body: { step: "audit", dryRun: true } }), 200], ["api::evict", req({ body: { dryRun: true } }), 200], + ["api::session-reap", req({ body: { dryRun: true, thresholdMs: 14400000 } }), 200], ["api::smart-search", req({ body: { query: "api", expandIds: ["obs_1"], limit: 5, project: "git:repo", includeLessons: true, agentId: "agent-env", sessionId: "ses_1", start_time: "2026-06-01T00:00:00Z", end_time: "2026-06-30T23:59:59Z" } }), 200], ["api::diagnostic-followup", req(), 200], ["api::timeline", req({ body: { anchor: "2026-06-14", before: 1, after: 1, project: "git:repo" } }), 200], diff --git a/test/cli-http-auth.test.ts b/test/cli-http-auth.test.ts index b3930fd4..fbf40f01 100644 --- a/test/cli-http-auth.test.ts +++ b/test/cli-http-auth.test.ts @@ -1,6 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { spawn } from "node:child_process"; -import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; import { createServer } from "node:http"; import { tmpdir } from "node:os"; import { join } from "node:path"; @@ -391,4 +391,171 @@ describe("CLI JSON request auth headers", () => { await closeServer(server); } }); + + it("uses file-backed AGENTMEMORY_URL and secret for the reap command call path", async () => { + const requestedUrls: string[] = []; + const requestBodies: string[] = []; + const server = createServer((req, res) => { + requestedUrls.push(req.url || ""); + let body = ""; + req.on("data", (chunk) => { + body += chunk.toString(); + }); + req.on("end", () => { + requestBodies.push(body); + if (req.url === "/agentmemory/livez") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true })); + return; + } + if (req.url === "/agentmemory/sessions/reap") { + expect(req.headers["authorization"]).toBe("Bearer file-secret"); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + success: true, + eligible: 1, + recovered: 0, + skipped: 0, + failed: 0, + }), + ); + return; + } + res.writeHead(404, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ success: false, error: "unexpected path" })); + }); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + const address = server.address(); + if (!address || typeof address === "string") { + await closeServer(server); + throw new Error("test server did not bind to a TCP port"); + } + + writeEnv( + [ + `AGENTMEMORY_URL=http://127.0.0.1:${address.port}`, + "AGENTMEMORY_SECRET=file-secret", + ].join("\n"), + ); + + try { + const result = await runCli(["reap", "--dry-run", "--threshold", "4h"]); + + expect(result.exitCode).toBe(0); + expect(result.stderr).not.toContain("file-secret"); + expect(result.stdout + result.stderr).toContain("eligible=1"); + expect(requestedUrls).toEqual([ + "/agentmemory/livez", + "/agentmemory/sessions/reap", + ]); + expect(JSON.parse(requestBodies[1] || "{}")).toEqual({ + dryRun: true, + thresholdMs: 14_400_000, + }); + } finally { + await closeServer(server); + } + }); + + it("rejects invalid reap thresholds before probing livez", async () => { + const requestedUrls: string[] = []; + const server = createServer((req, res) => { + requestedUrls.push(req.url || ""); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true })); + }); + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + const address = server.address(); + if (!address || typeof address === "string") { + await closeServer(server); + throw new Error("test server did not bind to a TCP port"); + } + writeEnv(`AGENTMEMORY_URL=http://127.0.0.1:${address.port}`); + + try { + const result = await runCli(["reap", "--threshold", "1h30m"]); + + expect(result.exitCode).toBe(1); + expect(result.stdout + result.stderr).toContain("threshold"); + expect(requestedUrls).toEqual([]); + } finally { + await closeServer(server); + } + }); + + it("rejects missing reap threshold values before probing livez", async () => { + const requestedUrls: string[] = []; + const server = createServer((req, res) => { + requestedUrls.push(req.url || ""); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true })); + }); + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + const address = server.address(); + if (!address || typeof address === "string") { + await closeServer(server); + throw new Error("test server did not bind to a TCP port"); + } + writeEnv(`AGENTMEMORY_URL=http://127.0.0.1:${address.port}`); + + try { + const result = await runCli(["reap", "--threshold"]); + + expect(result.exitCode).toBe(1); + expect(result.stdout + result.stderr).toContain("--threshold requires a value"); + expect(requestedUrls).toEqual([]); + } finally { + await closeServer(server); + } + }); + + it("rejects ambiguous reap threshold flags before probing livez", async () => { + const requestedUrls: string[] = []; + const server = createServer((req, res) => { + requestedUrls.push(req.url || ""); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true })); + }); + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + const address = server.address(); + if (!address || typeof address === "string") { + await closeServer(server); + throw new Error("test server did not bind to a TCP port"); + } + writeEnv(`AGENTMEMORY_URL=http://127.0.0.1:${address.port}`); + + try { + const result = await runCli([ + "reap", + "--threshold=4h", + "--threshold-ms", + "14400000", + ]); + + expect(result.exitCode).toBe(1); + expect(result.stdout + result.stderr).toContain("either threshold or thresholdMs"); + expect(requestedUrls).toEqual([]); + } finally { + await closeServer(server); + } + }); + + it("documents reap in CLI help and command map", () => { + const source = readFileSync(join(REPO_ROOT, "src/cli.ts"), "utf-8"); + + expect(source).toContain("reap"); + expect(source).toContain("agentmemory reap --threshold 4h"); + expect(source).toContain("reap: runReap"); + }); }); diff --git a/test/diagnostics.test.ts b/test/diagnostics.test.ts index 1e168767..b6185285 100644 --- a/test/diagnostics.test.ts +++ b/test/diagnostics.test.ts @@ -16,6 +16,7 @@ import type { Session, Memory, MeshPeer, + CompressedObservation, } from "../src/types.js"; import { KV } from "../src/state/schema.js"; @@ -33,15 +34,16 @@ function mockKV() { delete: async (scope: string, key: string): Promise => { store.get(scope)?.delete(key); }, - list: async (scope: string): Promise => { + list: vi.fn(async (scope: string): Promise => { const entries = store.get(scope); return entries ? (Array.from(entries.values()) as T[]) : []; - }, + }), }; } function mockSdk() { const functions = new Map(); + const triggerCalls: Array<{ function_id: string; payload: unknown }> = []; return { registerFunction: (idOrOpts: string | { id: string }, handler: Function) => { const id = typeof idOrOpts === "string" ? idOrOpts : idOrOpts.id; @@ -51,10 +53,15 @@ function mockSdk() { trigger: async (idOrInput: string | { function_id: string; payload: unknown }, data?: unknown) => { const id = typeof idOrInput === "string" ? idOrInput : idOrInput.function_id; const payload = typeof idOrInput === "string" ? data : idOrInput.payload; + triggerCalls.push({ function_id: id, payload }); const fn = functions.get(id); + if (!fn && id === "mem::session-reap") { + return { success: true, eligible: 1, recovered: payload && typeof payload === "object" && "dryRun" in payload && payload.dryRun ? 0 : 1, skipped: 0, failed: 0 }; + } if (!fn) throw new Error(`No function: ${id}`); return fn(payload); }, + triggerCalls, }; } @@ -147,6 +154,24 @@ function makeSession(overrides: Partial = {}): Session { }; } +function makeObservation( + overrides: Partial & { id: string; sessionId: string; timestamp: string }, +): CompressedObservation { + return { + id: overrides.id, + sessionId: overrides.sessionId, + timestamp: overrides.timestamp, + type: "conversation", + title: "Observation", + facts: [], + narrative: "Observation", + concepts: [], + files: [], + importance: 5, + ...overrides, + }; +} + function makeMemory(overrides: Partial = {}): Memory { return { id: `mem_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`, @@ -386,10 +411,10 @@ describe("Diagnostics Functions", () => { expect(check!.fixable).toBe(true); }); - it("active session older than 24h produces warn", async () => { + it("active session idle beyond 4h produces fixable warn", async () => { const session = makeSession({ status: "active", - startedAt: new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString(), + startedAt: new Date(Date.now() - 5 * 60 * 60 * 1000).toISOString(), }); await kv.set(KV.sessions, session.id, session); @@ -402,7 +427,79 @@ describe("Diagnostics Functions", () => { ); expect(check).toBeDefined(); expect(check!.status).toBe("warn"); - expect(check!.fixable).toBe(false); + expect(check!.fixable).toBe(true); + }); + + it("old startedAt plus recent observation does not warn", async () => { + const session = makeSession({ + status: "active", + startedAt: new Date(Date.now() - 6 * 60 * 60 * 1000).toISOString(), + }); + await kv.set(KV.sessions, session.id, session); + await kv.set( + KV.observations(session.id), + "obs_recent", + makeObservation({ + id: "obs_recent", + sessionId: session.id, + timestamp: new Date(Date.now() - 30 * 60 * 1000).toISOString(), + }), + ); + + const result = (await sdk.trigger("mem::diagnose", { + categories: ["sessions"], + })) as { checks: DiagnosticCheck[] }; + + expect(result.checks.some((c) => c.name.startsWith("abandoned-session:"))).toBe(false); + expect(result.checks.find((c) => c.name === "sessions-ok")).toBeDefined(); + }); + + it("exact 4h session threshold produces fixable warn", async () => { + const session = makeSession({ + status: "active", + startedAt: new Date(Date.now() - 4 * 60 * 60 * 1000).toISOString(), + }); + await kv.set(KV.sessions, session.id, session); + + const result = (await sdk.trigger("mem::diagnose", { + categories: ["sessions"], + })) as { checks: DiagnosticCheck[] }; + + const check = result.checks.find((c) => + c.name.startsWith("abandoned-session:"), + ); + expect(check).toBeDefined(); + expect(check!.fixable).toBe(true); + }); + + it("all-invalid session timestamps do not crash diagnostics", async () => { + const session = makeSession({ + status: "active", + startedAt: "not-a-date", + }); + await kv.set(KV.sessions, session.id, session); + + const result = (await sdk.trigger("mem::diagnose", { + categories: ["sessions"], + })) as { success: boolean; checks: DiagnosticCheck[] }; + + expect(result.success).toBe(true); + expect(result.checks.find((c) => c.name === "sessions-ok")).toBeDefined(); + }); + + it("fresh updatedAt sessions do not enumerate observations", async () => { + const session = { + ...makeSession({ + status: "active", + startedAt: new Date(Date.now() - 6 * 60 * 60 * 1000).toISOString(), + }), + updatedAt: new Date(Date.now() - 30 * 60 * 1000).toISOString(), + }; + await kv.set(KV.sessions, session.id, session); + + await sdk.trigger("mem::diagnose", { categories: ["sessions"] }); + + expect(kv.list).not.toHaveBeenCalledWith(KV.observations(session.id)); }); it("memory with stale isLatest produces fail (fixable)", async () => { @@ -639,6 +736,21 @@ describe("Diagnostics Functions", () => { const unchanged = await kv.get(KV.actions, blocked.id); expect(unchanged!.status).toBe("blocked"); }); + + it("delegates session recovery to mem::session-reap", async () => { + const result = (await sdk.trigger("mem::heal", { + categories: ["sessions"], + dryRun: true, + })) as { success: boolean; fixed: number; details: string[] }; + + expect(result.success).toBe(true); + expect(sdk.triggerCalls).toContainEqual({ + function_id: "mem::session-reap", + payload: { source: "heal", dryRun: true }, + }); + expect(result.fixed).toBe(1); + expect(result.details[0]).toContain("abandoned session"); + }); }); describe("per-store tally categories (#lesson-visibility)", () => { diff --git a/test/session-reaper-scheduler.test.ts b/test/session-reaper-scheduler.test.ts new file mode 100644 index 00000000..deedad27 --- /dev/null +++ b/test/session-reaper-scheduler.test.ts @@ -0,0 +1,214 @@ +import { readFileSync } from "node:fs"; +import { describe, expect, it, vi } from "vitest"; +import { + DEFAULT_SESSION_REAP_THRESHOLD_MS, + startSessionReaper, +} from "../src/functions/session-reaper-scheduler.js"; + +describe("scheduled session reaper", () => { + it("schedules an unref'd startup reconciliation by default", async () => { + const trigger = vi.fn().mockResolvedValue({ success: true }); + const log = { warn: vi.fn() }; + const unref = vi.fn(); + let callback: (() => Promise) | undefined; + const setTimeoutFn = vi.fn((cb: () => Promise, delayMs: number) => { + callback = cb; + expect(delayMs).toBe(0); + return { unref }; + }); + const setIntervalFn = vi.fn(); + + const timers = startSessionReaper( + { trigger } as never, + log, + {}, + setTimeoutFn as never, + setIntervalFn as never, + ); + + expect(timers).toEqual({ startupTimer: { unref } }); + expect(setTimeoutFn).toHaveBeenCalledTimes(1); + expect(setIntervalFn).not.toHaveBeenCalled(); + expect(unref).toHaveBeenCalledTimes(1); + + await callback?.(); + + expect(trigger).toHaveBeenCalledWith({ + function_id: "mem::session-reap", + payload: { + dryRun: false, + source: "startup", + thresholdMs: DEFAULT_SESSION_REAP_THRESHOLD_MS, + }, + }); + }); + + it("can be disabled explicitly", () => { + const trigger = vi.fn(); + const setTimeoutFn = vi.fn(); + const setIntervalFn = vi.fn(); + + const timers = startSessionReaper( + { trigger } as never, + { warn: vi.fn() }, + { AGENTMEMORY_REAP_ENABLED: "false" }, + setTimeoutFn as never, + setIntervalFn as never, + ); + + expect(timers).toBeNull(); + expect(setTimeoutFn).not.toHaveBeenCalled(); + expect(setIntervalFn).not.toHaveBeenCalled(); + expect(trigger).not.toHaveBeenCalled(); + }); + + it("disables only startup when AGENTMEMORY_REAP_ON_STARTUP=false", () => { + const setTimeoutFn = vi.fn(); + const setIntervalFn = vi.fn(() => ({ unref: vi.fn() })); + + const timers = startSessionReaper( + { trigger: vi.fn() } as never, + { warn: vi.fn() }, + { + AGENTMEMORY_REAP_ON_STARTUP: "false", + AGENTMEMORY_REAP_INTERVAL_MS: "60000", + }, + setTimeoutFn as never, + setIntervalFn as never, + ); + + expect(timers).toHaveProperty("intervalTimer"); + expect(timers).not.toHaveProperty("startupTimer"); + expect(setTimeoutFn).not.toHaveBeenCalled(); + expect(setIntervalFn).toHaveBeenCalledWith(expect.any(Function), 60000); + }); + + it("does not schedule periodic reconciliation when interval is unset or zero", () => { + const setTimeoutFn = vi.fn(() => ({ unref: vi.fn() })); + const setIntervalFn = vi.fn(); + + startSessionReaper( + { trigger: vi.fn() } as never, + { warn: vi.fn() }, + {}, + setTimeoutFn as never, + setIntervalFn as never, + ); + startSessionReaper( + { trigger: vi.fn() } as never, + { warn: vi.fn() }, + { AGENTMEMORY_REAP_INTERVAL_MS: "0" }, + setTimeoutFn as never, + setIntervalFn as never, + ); + + expect(setIntervalFn).not.toHaveBeenCalled(); + }); + + it("passes threshold env through startup and periodic payloads", async () => { + const trigger = vi.fn().mockResolvedValue({ success: true }); + const startupUnref = vi.fn(); + const intervalUnref = vi.fn(); + let startupCallback: (() => Promise) | undefined; + let intervalCallback: (() => Promise) | undefined; + const setTimeoutFn = vi.fn((cb: () => Promise) => { + startupCallback = cb; + return { unref: startupUnref }; + }); + const setIntervalFn = vi.fn((cb: () => Promise, intervalMs: number) => { + intervalCallback = cb; + expect(intervalMs).toBe(3600000); + return { unref: intervalUnref }; + }); + + startSessionReaper( + { trigger } as never, + { warn: vi.fn() }, + { + AGENTMEMORY_REAP_THRESHOLD_MS: "60000", + AGENTMEMORY_REAP_INTERVAL_MS: "3600000", + }, + setTimeoutFn as never, + setIntervalFn as never, + ); + + await startupCallback?.(); + await intervalCallback?.(); + + expect(trigger).toHaveBeenNthCalledWith(1, { + function_id: "mem::session-reap", + payload: { dryRun: false, source: "startup", thresholdMs: 60000 }, + }); + expect(trigger).toHaveBeenNthCalledWith(2, { + function_id: "mem::session-reap", + payload: { dryRun: false, source: "scheduler", thresholdMs: 60000 }, + }); + expect(startupUnref).toHaveBeenCalledTimes(1); + expect(intervalUnref).toHaveBeenCalledTimes(1); + }); + + it("falls back to default threshold and warns for invalid threshold env", async () => { + const trigger = vi.fn().mockResolvedValue({ success: true }); + const log = { warn: vi.fn() }; + let callback: (() => Promise) | undefined; + const setTimeoutFn = vi.fn((cb: () => Promise) => { + callback = cb; + return { unref: vi.fn() }; + }); + + startSessionReaper( + { trigger } as never, + log, + { AGENTMEMORY_REAP_THRESHOLD_MS: "bad" }, + setTimeoutFn as never, + vi.fn() as never, + ); + await callback?.(); + + expect(log.warn).toHaveBeenCalledWith( + "Invalid session reaper threshold env; using default", + expect.objectContaining({ error: expect.any(String) }), + ); + expect(trigger).toHaveBeenCalledWith({ + function_id: "mem::session-reap", + payload: { + dryRun: false, + source: "startup", + thresholdMs: DEFAULT_SESSION_REAP_THRESHOLD_MS, + }, + }); + }); + + it("contains and logs trigger failures", async () => { + const trigger = vi.fn().mockRejectedValue(new Error("boom")); + const log = { warn: vi.fn() }; + let callback: (() => Promise) | undefined; + const setTimeoutFn = vi.fn((cb: () => Promise) => { + callback = cb; + return { unref: vi.fn() }; + }); + + startSessionReaper( + { trigger } as never, + log, + {}, + setTimeoutFn as never, + vi.fn() as never, + ); + + await expect(callback?.()).resolves.toBeUndefined(); + expect(log.warn).toHaveBeenCalledWith("Session reaper failed", { + source: "startup", + error: "boom", + }); + }); + + it("worker registers and cleans up session reaper timers", () => { + const source = readFileSync("src/index.ts", "utf-8"); + + expect(source).toContain("registerSessionReaperFunction(sdk, kv)"); + expect(source).toContain("const sessionReaperTimers = startSessionReaper("); + expect(source).toContain("if (sessionReaperTimers?.startupTimer) clearTimeout(sessionReaperTimers.startupTimer);"); + expect(source).toContain("if (sessionReaperTimers?.intervalTimer) clearInterval(sessionReaperTimers.intervalTimer);"); + }); +}); diff --git a/test/session-reaper.test.ts b/test/session-reaper.test.ts new file mode 100644 index 00000000..0679ff66 --- /dev/null +++ b/test/session-reaper.test.ts @@ -0,0 +1,428 @@ +import { beforeEach, afterEach, describe, expect, it, vi } from "vitest"; +import { KV } from "../src/state/schema.js"; +import { withKeyedLock } from "../src/state/keyed-mutex.js"; +import type { CompressedObservation, Session } from "../src/types.js"; +import { + listAbandonedSessionCandidates, + parseSessionReapThreshold, + registerSessionReaperFunction, +} from "../src/functions/session-reaper.js"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +type TestSession = Session & { updatedAt?: string }; +type TriggerCall = { function_id: string; payload: unknown }; + +function makeSession(overrides: Partial & { id: string }): TestSession { + return { + id: overrides.id, + project: "git:repo", + cwd: "/repo", + startedAt: "2026-06-19T08:00:00.000Z", + updatedAt: "2026-06-19T08:00:00.000Z", + status: "active", + observationCount: 0, + ...overrides, + }; +} + +function makeObservation( + overrides: Partial & { id: string; sessionId: string; timestamp: string }, +): CompressedObservation { + return { + id: overrides.id, + sessionId: overrides.sessionId, + timestamp: overrides.timestamp, + type: "conversation", + title: "Observation", + facts: [], + narrative: "Observation", + concepts: [], + files: [], + importance: 5, + ...overrides, + }; +} + +function mockKV() { + const store = new Map>(); + const api = { + beforeGet: undefined as undefined | ((scope: string, key: string) => void | Promise), + get: vi.fn(async (scope: string, key: string): Promise => { + await api.beforeGet?.(scope, key); + return (store.get(scope)?.get(key) as T) ?? null; + }), + set: vi.fn(async (scope: string, key: string, value: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, value); + return value; + }), + delete: vi.fn(async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }), + list: vi.fn(async (scope: string): Promise => { + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }), + seed: (scope: string, key: string, value: unknown) => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, value); + }, + values: (scope: string): T[] => + Array.from(store.get(scope)?.values() ?? []) as T[], + }; + return api; +} + +function mockSdk() { + const functions = new Map(); + const triggerCalls: TriggerCall[] = []; + const trigger = vi.fn(async (input: TriggerCall) => { + triggerCalls.push(input); + const handler = functions.get(input.function_id); + if (handler) return handler(input.payload); + if (input.function_id === "event::session::stopped") return { success: true }; + if (input.function_id === "mem::consolidate-pipeline") return { success: true }; + return { success: true }; + }); + return { + registerFunction: (idOrOpts: string | { id: string }, handler: Function) => { + const id = typeof idOrOpts === "string" ? idOrOpts : idOrOpts.id; + functions.set(id, handler); + }, + registerTrigger: vi.fn(), + trigger, + triggerCalls, + getFunction: (id: string) => functions.get(id), + }; +} + +describe("session reaper", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-06-19T16:00:00.000Z")); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("parses supported thresholds and rejects ambiguous duration text", () => { + expect(parseSessionReapThreshold("4h")).toEqual({ ok: true, valueMs: 14_400_000 }); + expect(parseSessionReapThreshold("30m")).toEqual({ ok: true, valueMs: 1_800_000 }); + expect(parseSessionReapThreshold("250ms")).toEqual({ ok: true, valueMs: 250 }); + expect(parseSessionReapThreshold(1000)).toEqual({ ok: true, valueMs: 1000 }); + expect(parseSessionReapThreshold("0")).toMatchObject({ ok: false }); + expect(parseSessionReapThreshold("1h30m")).toMatchObject({ ok: false }); + }); + + it("uses the newest valid observation when session-level activity is stale", async () => { + const kv = mockKV(); + const session = makeSession({ + id: "ses_obs_wins", + startedAt: "2026-06-19T08:00:00.000Z", + updatedAt: "2026-06-19T09:00:00.000Z", + }); + kv.seed(KV.sessions, session.id, session); + kv.seed( + KV.observations(session.id), + "obs_old", + makeObservation({ + id: "obs_old", + sessionId: session.id, + timestamp: "2026-06-19T10:00:00.000Z", + }), + ); + kv.seed( + KV.observations(session.id), + "obs_new", + makeObservation({ + id: "obs_new", + sessionId: session.id, + timestamp: "2026-06-19T11:00:00.000Z", + }), + ); + + const candidates = await listAbandonedSessionCandidates(kv as never, { + thresholdMs: 4 * 60 * 60 * 1000, + now: new Date("2026-06-19T16:00:00.000Z"), + }); + + expect(candidates).toHaveLength(1); + expect(candidates[0]).toMatchObject({ + sessionId: session.id, + lastActivitySource: "observation", + lastActivityAt: "2026-06-19T11:00:00.000Z", + }); + }); + + it("does not enumerate observations for sessions with fresh updatedAt", async () => { + const kv = mockKV(); + const session = makeSession({ + id: "ses_fresh_update", + updatedAt: "2026-06-19T15:30:00.000Z", + }); + kv.seed(KV.sessions, session.id, session); + kv.seed( + KV.observations(session.id), + "obs_old", + makeObservation({ + id: "obs_old", + sessionId: session.id, + timestamp: "2026-06-19T08:00:00.000Z", + }), + ); + + const candidates = await listAbandonedSessionCandidates(kv as never, { + thresholdMs: 4 * 60 * 60 * 1000, + now: new Date("2026-06-19T16:00:00.000Z"), + }); + + expect(candidates).toEqual([]); + expect(kv.list).not.toHaveBeenCalledWith(KV.observations(session.id)); + }); + + it("skips stale sessions when a recent observation proves activity", async () => { + const kv = mockKV(); + const session = makeSession({ + id: "ses_recent_obs", + updatedAt: "2026-06-19T08:00:00.000Z", + }); + kv.seed(KV.sessions, session.id, session); + kv.seed( + KV.observations(session.id), + "obs_recent", + makeObservation({ + id: "obs_recent", + sessionId: session.id, + timestamp: "2026-06-19T15:00:00.000Z", + }), + ); + + const candidates = await listAbandonedSessionCandidates(kv as never, { + thresholdMs: 4 * 60 * 60 * 1000, + now: new Date("2026-06-19T16:00:00.000Z"), + }); + + expect(candidates).toEqual([]); + expect(kv.list).toHaveBeenCalledWith(KV.observations(session.id)); + }); + + it("falls back from invalid updatedAt to observations and then startedAt", async () => { + const kv = mockKV(); + const withObservation = makeSession({ + id: "ses_invalid_with_obs", + updatedAt: "not-a-date", + startedAt: "2026-06-19T07:00:00.000Z", + }); + const startedOnly = makeSession({ + id: "ses_started_only", + updatedAt: "not-a-date", + startedAt: "2026-06-19T06:00:00.000Z", + }); + kv.seed(KV.sessions, withObservation.id, withObservation); + kv.seed(KV.sessions, startedOnly.id, startedOnly); + kv.seed( + KV.observations(withObservation.id), + "obs", + makeObservation({ + id: "obs", + sessionId: withObservation.id, + timestamp: "2026-06-19T11:00:00.000Z", + }), + ); + + const candidates = await listAbandonedSessionCandidates(kv as never, { + thresholdMs: 4 * 60 * 60 * 1000, + now: new Date("2026-06-19T16:00:00.000Z"), + }); + + expect(candidates.map((c) => [c.sessionId, c.lastActivitySource])).toEqual([ + [withObservation.id, "observation"], + [startedOnly.id, "startedAt"], + ]); + }); + + it("dry run reports eligible and future sessions without writes or lifecycle triggers", async () => { + const sdk = mockSdk(); + const kv = mockKV(); + registerSessionReaperFunction(sdk as never, kv as never); + const eligible = makeSession({ id: "ses_eligible", updatedAt: "2026-06-19T08:00:00.000Z" }); + const future = makeSession({ id: "ses_future", updatedAt: "2026-06-19T18:00:00.000Z" }); + kv.seed(KV.sessions, eligible.id, eligible); + kv.seed(KV.sessions, future.id, future); + + const result = await sdk.trigger({ + function_id: "mem::session-reap", + payload: { dryRun: true, thresholdMs: 14_400_000 }, + }) as { + success: boolean; + eligible: number; + recovered: number; + sessions: Array<{ sessionId: string; status: string; reason?: string }>; + }; + + expect(result).toMatchObject({ success: true, eligible: 1, recovered: 0 }); + expect(result.sessions).toEqual( + expect.arrayContaining([ + expect.objectContaining({ sessionId: eligible.id, status: "eligible" }), + expect.objectContaining({ sessionId: future.id, status: "skipped", reason: "future_last_activity" }), + ]), + ); + expect(await kv.get(KV.sessions, eligible.id)).toMatchObject({ status: "active" }); + expect(sdk.triggerCalls.some((c) => c.function_id === "event::session::stopped")).toBe(false); + expect(sdk.triggerCalls.some((c) => c.function_id === "mem::consolidate-pipeline")).toBe(false); + expect(kv.values(KV.audit)).toEqual([]); + }); + + it("completes abandoned sessions, runs stopped lifecycle, audits each session, and consolidates once", async () => { + const sdk = mockSdk(); + const kv = mockKV(); + registerSessionReaperFunction(sdk as never, kv as never); + const first = makeSession({ id: "ses_first", updatedAt: "2026-06-19T08:00:00.000Z" }); + const second = makeSession({ id: "ses_second", updatedAt: "2026-06-19T09:00:00.000Z" }); + kv.seed(KV.sessions, first.id, first); + kv.seed(KV.sessions, second.id, second); + + const result = await sdk.trigger({ + function_id: "mem::session-reap", + payload: { thresholdMs: 14_400_000 }, + }) as { success: boolean; recovered: number; failed: number }; + + expect(result).toMatchObject({ success: true, recovered: 2, failed: 0 }); + await expect(kv.get(KV.sessions, first.id)).resolves.toMatchObject({ + status: "completed", + endedAt: first.updatedAt, + }); + await expect(kv.get(KV.sessions, second.id)).resolves.toMatchObject({ + status: "completed", + endedAt: second.updatedAt, + }); + expect(sdk.triggerCalls.filter((c) => c.function_id === "event::session::stopped")).toEqual([ + { function_id: "event::session::stopped", payload: { sessionId: first.id } }, + { function_id: "event::session::stopped", payload: { sessionId: second.id } }, + ]); + expect(sdk.triggerCalls.filter((c) => c.function_id === "mem::consolidate-pipeline")).toEqual([ + { function_id: "mem::consolidate-pipeline", payload: { tier: "all" } }, + ]); + expect(kv.values(KV.audit)).toHaveLength(2); + expect(kv.values>(KV.audit)[0]).toMatchObject({ + operation: "heal", + functionId: "mem::session-reap", + details: expect.objectContaining({ reason: "abandoned-session-reap", pipelineOk: true }), + }); + }); + + it("keeps completed state and reports pipeline failure when stopped lifecycle fails", async () => { + const sdk = mockSdk(); + sdk.trigger.mockImplementation(async (input: TriggerCall) => { + sdk.triggerCalls.push(input); + const handler = sdk.getFunction(input.function_id); + if (handler) return handler(input.payload); + if (input.function_id === "event::session::stopped") { + throw new Error("summarize failed"); + } + return { success: true }; + }); + const kv = mockKV(); + registerSessionReaperFunction(sdk as never, kv as never); + const session = makeSession({ id: "ses_pipeline_fail", updatedAt: "2026-06-19T08:00:00.000Z" }); + kv.seed(KV.sessions, session.id, session); + + const result = await sdk.trigger({ + function_id: "mem::session-reap", + payload: { thresholdMs: 14_400_000 }, + }) as { + recovered: number; + failed: number; + sessions: Array<{ sessionId: string; pipelineOk: boolean; error?: string }>; + }; + + expect(result.recovered).toBe(1); + expect(result.failed).toBe(1); + await expect(kv.get(KV.sessions, session.id)).resolves.toMatchObject({ + status: "completed", + endedAt: session.updatedAt, + }); + expect(result.sessions[0]).toMatchObject({ + sessionId: session.id, + pipelineOk: false, + error: "summarize failed", + }); + expect(kv.values>(KV.audit)[0]).toMatchObject({ + details: expect.objectContaining({ pipelineOk: false }), + }); + }); + + it("skips a session that becomes completed before the locked re-read", async () => { + const sdk = mockSdk(); + const kv = mockKV(); + registerSessionReaperFunction(sdk as never, kv as never); + const session = makeSession({ id: "ses_completed_race", updatedAt: "2026-06-19T08:00:00.000Z" }); + kv.seed(KV.sessions, session.id, session); + kv.beforeGet = async (scope, key) => { + if (scope === KV.sessions && key === session.id) { + kv.seed(KV.sessions, session.id, { ...session, status: "completed", endedAt: session.updatedAt }); + kv.beforeGet = undefined; + } + }; + + const result = await sdk.trigger({ + function_id: "mem::session-reap", + payload: { thresholdMs: 14_400_000 }, + }) as { recovered: number; sessions: Array<{ status: string; reason?: string }> }; + + expect(result.recovered).toBe(0); + expect(result.sessions[0]).toMatchObject({ status: "skipped", reason: "session_not_active" }); + expect(sdk.triggerCalls.some((c) => c.function_id === "event::session::stopped")).toBe(false); + }); + + it("waits on the observe lock and skips when a fresh update lands first", async () => { + const sdk = mockSdk(); + const kv = mockKV(); + registerSessionReaperFunction(sdk as never, kv as never); + const session = makeSession({ id: "ses_observe_lock", updatedAt: "2026-06-19T08:00:00.000Z" }); + kv.seed(KV.sessions, session.id, session); + + let releaseObserve!: () => void; + const observeWrite = withKeyedLock(`obs:${session.id}`, async () => { + await new Promise((resolve) => { + releaseObserve = resolve; + }); + await kv.set(KV.sessions, session.id, { + ...session, + updatedAt: "2026-06-19T15:30:00.000Z", + observationCount: 1, + }); + await kv.set( + KV.observations(session.id), + "obs_recent", + makeObservation({ + id: "obs_recent", + sessionId: session.id, + timestamp: "2026-06-19T15:30:00.000Z", + }), + ); + }); + + const reap = sdk.trigger({ + function_id: "mem::session-reap", + payload: { thresholdMs: 14_400_000 }, + }) as Promise<{ recovered: number; sessions: Array<{ status: string; reason?: string }> }>; + + await Promise.resolve(); + releaseObserve(); + await observeWrite; + const result = await reap; + + expect(result.recovered).toBe(0); + expect(result.sessions[0]).toMatchObject({ status: "skipped", reason: "session_recent_activity" }); + await expect(kv.get(KV.sessions, session.id)).resolves.toMatchObject({ + status: "active", + updatedAt: "2026-06-19T15:30:00.000Z", + }); + expect(sdk.triggerCalls.some((c) => c.function_id === "event::session::stopped")).toBe(false); + }); +});