feat: Sessions - bidirectional durable agent streams#3417
feat: Sessions - bidirectional durable agent streams#3417
Conversation
🦋 Changeset detectedLatest commit: 84d3db1 The changes in this PR will be included in the next version bump. This PR includes changesets to release 29 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughIntroduces a durable Session primitive end-to-end: a new Prisma Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
2210fe2 to
4cadc19
Compare
Durable, typed, bidirectional I/O primitive that outlives a single run.
Ship target is agent/chat use cases; run-scoped streams.pipe/streams.input
are untouched and do not create Session rows.
Postgres
- New Session table: id, friendlyId, externalId, type (plain string),
denormalised project/environment/organization scalar columns (no FKs),
taskIdentifier, tags String[], metadata Json, closedAt, closedReason,
expiresAt, timestamps
- Point-lookup indexes only (friendlyId unique, (env, externalId) unique,
expiresAt). List queries are served from ClickHouse so Postgres stays
minimal and insert-heavy.
Control-plane API
- POST /api/v1/sessions create (idempotent via externalId)
- GET /api/v1/sessions list with filters (type, tag,
taskIdentifier, externalId, status
ACTIVE|CLOSED|EXPIRED, period/from/to)
and cursor pagination, ClickHouse-backed
- GET /api/v1/sessions/:session retrieve — polymorphic: `session_` prefix
hits friendlyId, otherwise externalId
- PATCH /api/v1/sessions/:session update tags/metadata/externalId
- POST /api/v1/sessions/:session/close terminal close (idempotent)
Realtime (S2-backed)
- PUT /realtime/v1/sessions/:session/:io returns S2 creds
- GET /realtime/v1/sessions/:session/:io SSE subscribe
- POST /realtime/v1/sessions/:session/:io/append server-side append
- S2 key format: sessions/{friendlyId}/{out|in}
Auth
- sessions added to ResourceTypes. read:sessions:{id},
write:sessions:{id}, admin:sessions:{id} scopes work via existing JWT
validation.
ClickHouse
- sessions_v1 ReplacingMergeTree table
- SessionsReplicationService mirrors RunsReplicationService exactly:
logical replication with leader-locked consumer, ConcurrentFlushScheduler,
retry with exponential backoff + jitter, identical metric shape.
Dedicated slot + publication (sessions_to_clickhouse_v1[_publication]).
- SessionsRepository + ClickHouseSessionsRepository expose list, count,
tags with cursor pagination keyed by (created_at DESC, session_id DESC).
- Derived status (ACTIVE/CLOSED/EXPIRED) computed from closed_at + expires_at;
in-memory fallback on list results to catch pre-replication writes.
Verification
- Webapp typecheck 10/10
- Core + SDK build 3/3
- sessionsReplicationService.test.ts integration tests 2/2 (insert + update
round-trip via testcontainers)
- Live round-trip against local dev: create -> retrieve (friendlyId and
externalId) -> out.initialize -> out.append x2 -> in.send -> out.subscribe
(receives records) -> close -> ClickHouse sessions_v1 shows the replicated
row with closed_reason
- Live list smoke: tag, type, status CLOSED, externalId, and cursor pagination
…te/update The session_ prefix identifies internal friendlyIds. Allowing it in a user-supplied externalId would misroute subsequent GET/PATCH/close requests through resolveSessionByIdOrExternalId to a friendlyId lookup, returning null or the wrong session. Reject at the schema boundary so both routes surface a clean 422.
Without allowJWT/corsStrategy, frontend clients holding public access tokens hit 401 on GET /api/v1/sessions and browser preflights fail. Matches the single-session GET/PATCH/close routes and the runs list endpoint.
- Derive isCached from the upsert result (id mismatch = pre-existing row) instead of doing a separate findFirst first. The pre-check was racy — two concurrent first-time POSTs could both return 201 with isCached: false. Using the returned row's id is atomic and saves a round-trip. - Scope the list endpoint's authorization to the standard action/resource pattern (matches api.v1.runs.ts): task-scoped JWTs can list sessions filtered by their task, and broader super-scopes (read:sessions, read:all, admin) authorize unfiltered listing. - Log and swallow unexpected errors on POST rather than returning the raw error.message. Prisma/internal messages can leak column names and query fragments.
Give Session channels run-engine waitpoint semantics so a task can
suspend while idle on a session channel and resume when an external
client sends a record — parallel to what streams.input offers
run-scoped streams.
Webapp
- POST /api/v1/runs/:runFriendlyId/session-streams/wait — creates a
manual waitpoint attached to {sessionId, io} and race-checks the S2
stream starting at lastSeqNum so pre-arrived data fires it
immediately. Mirrors the existing input-stream waitpoint route.
- sessionStreamWaitpointCache.server.ts — Redis set keyed on
{sessionFriendlyId, io}, drained atomically on each append so
concurrent multi-tab waiters all wake together.
- realtime.v1.sessions.$session.$io.append now drains pending
waitpoints after every record lands and completes each with the
appended body.
- S2RealtimeStreams.readSessionStreamRecords — session-channel
parallel of readRecords, feeds the race-check path.
Core
- CreateSessionStreamWaitpoint request/response schemas alongside
the existing Session CRUD schemas. Server API contract only —
the client ApiClient + SDK wrapper ship on the AI-chat branch.
Two fixes needed by browser clients hitting the public session API
(TriggerChatTransport's direct accessToken path, WebSocket-less
session drivers, anything origin'd off the dashboard):
- POST /api/v1/sessions: allowJWT: true + corsStrategy: "all" on
the action. Pre-fix, the create endpoint only accepted secret-key
auth, so any browser-originated sessions.create(...) 401'd. The
loader (list) already had these; matches that shape.
- POST /realtime/v1/sessions/:session/:io/append: export both
{ action, loader } so Remix routes the OPTIONS preflight to the
route builder's CORS handler. With only { action } exported, the
preflight returns 400 'No loader for route' and Chrome surfaces
the follow-up POST as net::ERR_FAILED. Same pattern as
/api/v1/tasks/:id/trigger (which already exports both).
Validated by an end-to-end UI smoke on references/ai-chat:
new chat → send → streamed assistant reply in ~4s → second turn
reuses the same session + run, lastEventId advances 10 → 21.
f4406d7 to
4f2c0e7
Compare
Nine fixes from CodeRabbit + Devin review:
- api.v1.sessions.$session.close.ts:
- Export { action, loader } so CORS preflight reaches the route
builder's OPTIONS handler. Same fix already applied to the
append route — Devin caught that I'd missed this one. Without
the loader, browser clients hitting POST /close fail preflight.
- Switch to `prisma.session.updateMany({ where: { id, closedAt:
null }, ... })` so concurrent closes can't overwrite the
original `closedAt` / `closedReason`. Loser hits count === 0 and
re-reads the winning row — closedness is write-once at the DB
level. (CodeRabbit: TOCTOU.)
- entry.server.tsx:
Wrap the async `sessionsReplicationInstance.shutdown` in a sync
handler with `.catch(...)`. SIGTERM/SIGINT fire during process
teardown and a rejection from `_replicationClient.stop()` would
become an unhandled promise rejection. Matches the pattern in
`dynamicFlushScheduler.server.ts`. (CodeRabbit: unhandled rejection
risk.)
- api.v1.runs.$runFriendlyId.session-streams.wait.ts:
- Swallowed race-check catch now logs `warn` with
sessionFriendlyId / io / waitpointId / error. Silent failures in
the S2-read / engine-complete / cache-remove path were
indistinguishable from the expected cache-drain-on-append fast
path.
- Outer 500 path no longer forwards `error.message` (Prisma /
engine / S2 internals could leak). Logs server-side and returns
a generic "Something went wrong"; 422 ServiceValidationError
path unchanged. (CodeRabbit: info-leak + logging gap.)
- realtime.v1.sessions.$session.$io.ts:
Add `method: "PUT"` to the route config so the route builder
enforces method validation before the handler runs. Removed the
now-redundant `request.method !== "PUT"` check inside the handler.
(CodeRabbit: defense-in-depth.)
- services/sessionsRepository/sessionsRepository.server.ts:
`ISessionsRepository` is now a `type` alias, per repo coding
guideline ("use types over interfaces"). Structural-typing means
implementing classes don't need source changes. (CodeRabbit.)
- services/sessionStreamWaitpointCache.server.ts:
Replace separate SADD + PEXPIRE with a single atomic Lua script.
Solves two distinct concerns at once:
1. Partial-failure window (CodeRabbit): if SADD succeeded and
PEXPIRE failed, the key would persist with no TTL. The Lua
script fails both or succeeds both.
2. TTL-race (Devin, twice): each waitpoint registers with its own
`ttlMs` derived from the caller's timeout. The old code called
PEXPIRE unconditionally, so a short-TTL registration would
shrink the shared key's TTL below a longer-TTL sibling —
evicting the sibling from Redis and degrading the append-path
fast drain to engine-timeout-only. The script only PEXPIREs if
the new TTL is greater than the current PTTL (or the key has
no TTL yet), so the key lives as long as the longest-TTL
member.
Outstanding: one unresolved thread asking to rename
`CloseSessionRequestBody.reason` → `closedReason` for symmetry with
the DB column. Holding that for an API-taste call — will follow up.
Validated: `pnpm run typecheck --filter webapp` clean.
What this enables
A new first-class primitive, Session, for durable bidirectional I/O that outlives a single run. Sessions give you a server-managed channel pair (
.outfrom the task,.infrom the client) that you can write to, read from, and subscribe to across many runs, filter, list, and close, all through a single identifier.Use cases unblocked
.in, the client writes to.in, and the server enforces no-writes-after-close..outafter the task finishes to replay the history.Public API surface
Control plane
POST /api/v1/sessionsto create. Idempotent when you supplyexternalId.GET /api/v1/sessions/:sessionto retrieve by friendlyId (session_abc) or by your ownexternalId. The server disambiguates via thesession_prefix.GET /api/v1/sessionsto list with filters (type,tag,taskIdentifier,externalId, derivedstatus= ACTIVE/CLOSED/EXPIRED, created-at period/from/to) and cursor pagination. Backed by ClickHouse.PATCH /api/v1/sessions/:sessionto update tags/metadata/externalId.POST /api/v1/sessions/:session/closeto terminate. Idempotent, hard-blocks new server-brokered writes.Realtime
PUT /realtime/v1/sessions/:session/:ioto initialize a channel. Returns S2 credentials in headers so clients can write direct to S2 for high-throughput cases.GET /realtime/v1/sessions/:session/:iofor SSE subscribe.POST /realtime/v1/sessions/:session/:io/appendfor server-side appends.Scopes
sessionsis now a ResourceType.read:sessions:{id},write:sessions:{id},admin:sessions:{id}all flow through the existing JWT validator.Implementation summary
Postgres (
Sessiontable)friendlyIdunique,(env, externalId)unique,expiresAt. List queries are served from ClickHouse, so Postgres stays insert-heavy.closedAt,closedReason,expiresAt) are write-once. No status enum, no counters, no currentRunId pointer. All run-related state is derived.ClickHouse (
sessions_v1)(org_id, project_id, environment_id, created_at, session_id).tagsindexed with a tokenbf_v1 skip index.SessionsReplicationServicemirrorsRunsReplicationServiceexactly: logical replication with leader-locked consumer,ConcurrentFlushScheduler, retry with exponential backoff + jitter, identical metric shape. Dedicated slot + publication so the two consume independently.SessionsRepository+ClickHouseSessionsRepositoryexpose list / count / tags with the same cursor pagination convention as runs and waitpoints.S2
sessions/{friendlyId}/{out|in}. The existingruns/{runId}/{streamId}format for implicit run streams is completely untouched.What did not change
streams.pipe/streams.inputstill behave exactly as before. They do not create Session rows and the existing routes are unchanged. Sessions are a net-new primitive for the next phase of agent features, not a reshaping of the current streams API.Verification
apps/webapp/test/sessionsReplicationService.test.tsexercises insert and update round-trips through Postgres logical replication into ClickHouse via testcontainers..out.initialize,.out.appendx2,.in.send,.out.subscribeover SSE, list (type, tag, status, externalId, pagination), close, idempotent re-close. Replicated row lands in ClickHouse within ~1s withclosed_reasonintact.Not in this PR
chat.agent).chat.agentintegration.Test plan
pnpm run typecheck --filter webapppnpm run test --filter webapp ./test/sessionsReplicationService.test.ts --runSESSION_REPLICATION_CLICKHOUSE_URLandSESSION_REPLICATION_ENABLED=1set. Confirm the slot and publication auto-create on boot.POST /api/v1/sessionsand verify the row replicates totrigger_dev.sessions_v1within a couple of seconds.POST /api/v1/sessions/:id/closeand confirm subsequentPOST /realtime/v1/sessions/:id/out/appendreturns 400.