-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: Sessions - bidirectional durable agent streams #3417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ericallam
wants to merge
8
commits into
main
Choose a base branch
from
feature/tri-8627-session-primitive-server-side-schema-routes-clickhouse
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
edded41
feat(webapp,clickhouse,database,core): Session primitive (server side)
ericallam f6149c9
code review fixes
ericallam a724f9c
fix(core): reject externalId starting with 'session_' on Session crea…
ericallam fceabc1
fix(webapp): allow JWT + CORS on sessions list endpoint
ericallam 2f8903a
fix(webapp): tighten sessions create + list auth
ericallam aaab958
feat(webapp,core): Session channel waitpoints — server side
ericallam 4f2c0e7
fix(webapp): CORS + allowJWT on public session create + append preflight
ericallam 84d3db1
fix(webapp): address #3417 PR review feedback
ericallam File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@trigger.dev/core": patch | ||
| --- | ||
|
|
||
| Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows. | ||
|
|
||
| Adds `POST /api/v1/runs/:runFriendlyId/session-streams/wait` (session-stream waitpoint creation) and wires `POST /realtime/v1/sessions/:session/:io/append` to fire any pending waitpoints on the channel. Gives `session.in` run-engine waitpoint semantics matching run-scoped input streams: a task can suspend while idle on a session channel and resume when an external client sends a record. Redis-backed pending-waitpoint set (`ssw:{sessionFriendlyId}:{io}`) is drained atomically on each append so multiple concurrent waiters (e.g. multi-tab chat) all resume together. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| --- | ||
| area: webapp | ||
| type: fix | ||
| --- | ||
|
|
||
| CORS + preflight parity on the public session API so browser-side chat transports can hit the session endpoints without being blocked: | ||
|
|
||
| - `POST /api/v1/sessions` (session upsert) gains `allowJWT: true` + `corsStrategy: "all"` so PATs minted by `chat.createTriggerAction` (and other browser-side session flows) pass the route's auth + respond to CORS preflight. Previously this route only accepted secret-key auth, which broke any browser-originated `sessions.create(...)` call — including the transport's direct `accessToken` fallback path. | ||
| - `POST /realtime/v1/sessions/:session/:io/append` now exports both `{ action, loader }`. The route builder installs the OPTIONS preflight handler on the `loader` even for write-only routes; without the loader export, the CORS preflight was returning 400 ("No loader for route") and Chrome treated the follow-up `POST` as `net::ERR_FAILED`. | ||
|
|
||
| Validated by an end-to-end UI smoke against the `references/ai-chat` app: brand-new chat → send → streamed assistant reply in ~4s → follow-up turn on the same session → `lastEventId` advances from 10 → 21. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
173 changes: 173 additions & 0 deletions
173
apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { | ||
| CreateSessionStreamWaitpointRequestBody, | ||
| type CreateSessionStreamWaitpointResponseBody, | ||
| } from "@trigger.dev/core/v3"; | ||
| import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; | ||
| import { z } from "zod"; | ||
| import { $replica } from "~/db.server"; | ||
| import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server"; | ||
| import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; | ||
| import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; | ||
| import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; | ||
| import { | ||
| addSessionStreamWaitpoint, | ||
| removeSessionStreamWaitpoint, | ||
| } from "~/services/sessionStreamWaitpointCache.server"; | ||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
| import { logger } from "~/services/logger.server"; | ||
| import { parseDelay } from "~/utils/delays"; | ||
| import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; | ||
| import { engine } from "~/v3/runEngine.server"; | ||
| import { ServiceValidationError } from "~/v3/services/baseService.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| runFriendlyId: z.string(), | ||
| }); | ||
|
|
||
| const { action, loader } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| body: CreateSessionStreamWaitpointRequestBody, | ||
| maxContentLength: 1024 * 10, // 10KB | ||
| method: "POST", | ||
| }, | ||
| async ({ authentication, body, params }) => { | ||
| try { | ||
| const run = await $replica.taskRun.findFirst({ | ||
| where: { | ||
| friendlyId: params.runFriendlyId, | ||
| runtimeEnvironmentId: authentication.environment.id, | ||
| }, | ||
| select: { | ||
| id: true, | ||
| friendlyId: true, | ||
| realtimeStreamsVersion: true, | ||
| }, | ||
| }); | ||
|
|
||
| if (!run) { | ||
| return json({ error: "Run not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| const session = await resolveSessionByIdOrExternalId( | ||
| $replica, | ||
| authentication.environment.id, | ||
| body.session | ||
| ); | ||
|
|
||
| if (!session) { | ||
| return json({ error: "Session not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| const idempotencyKeyExpiresAt = body.idempotencyKeyTTL | ||
| ? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL) | ||
| : undefined; | ||
|
|
||
| const timeout = await parseDelay(body.timeout); | ||
|
|
||
| const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags; | ||
|
|
||
| if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) { | ||
| throw new ServiceValidationError( | ||
| `Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.` | ||
| ); | ||
| } | ||
|
|
||
| if (bodyTags && bodyTags.length > 0) { | ||
| for (const tag of bodyTags) { | ||
| await createWaitpointTag({ | ||
| tag, | ||
| environmentId: authentication.environment.id, | ||
| projectId: authentication.environment.projectId, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // Step 1: Create the waitpoint. | ||
| const result = await engine.createManualWaitpoint({ | ||
| environmentId: authentication.environment.id, | ||
| projectId: authentication.environment.projectId, | ||
| idempotencyKey: body.idempotencyKey, | ||
| idempotencyKeyExpiresAt, | ||
| timeout, | ||
| tags: bodyTags, | ||
| }); | ||
|
|
||
| // Step 2: Register the waitpoint on the session channel so the next | ||
| // append fires it. Keyed by (sessionFriendlyId, io) — both runs on a | ||
| // multi-tab session wake on the same record. | ||
| const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; | ||
| await addSessionStreamWaitpoint( | ||
| session.friendlyId, | ||
| body.io, | ||
| result.waitpoint.id, | ||
| ttlMs && ttlMs > 0 ? ttlMs : undefined | ||
| ); | ||
|
|
||
| // Step 3: Race-check. If a record landed on the channel before this | ||
| // .wait() call, complete the waitpoint synchronously with that data | ||
| // and remove the pending registration. | ||
| if (!result.isCached) { | ||
| try { | ||
| const realtimeStream = getRealtimeStreamInstance( | ||
| authentication.environment, | ||
| run.realtimeStreamsVersion | ||
| ); | ||
|
|
||
| if (realtimeStream instanceof S2RealtimeStreams) { | ||
| const records = await realtimeStream.readSessionStreamRecords( | ||
| session.friendlyId, | ||
| body.io, | ||
| body.lastSeqNum | ||
| ); | ||
|
|
||
| if (records.length > 0) { | ||
| const record = records[0]!; | ||
|
|
||
| await engine.completeWaitpoint({ | ||
| id: result.waitpoint.id, | ||
| output: { | ||
| value: record.data, | ||
| type: "application/json", | ||
| isError: false, | ||
| }, | ||
| }); | ||
|
|
||
| await removeSessionStreamWaitpoint( | ||
| session.friendlyId, | ||
| body.io, | ||
| result.waitpoint.id | ||
| ); | ||
| } | ||
| } | ||
| } catch (error) { | ||
| // Non-fatal: pending registration stays in Redis; the next append | ||
| // will complete the waitpoint via the append handler path. Log so | ||
| // a broken race-check doesn't silently degrade to timeout-only. | ||
| logger.warn("session-stream wait race-check failed", { | ||
| sessionFriendlyId: session.friendlyId, | ||
| io: body.io, | ||
| waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), | ||
| error, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| return json<CreateSessionStreamWaitpointResponseBody>({ | ||
| waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), | ||
| isCached: result.isCached, | ||
| }); | ||
| } catch (error) { | ||
| if (error instanceof ServiceValidationError) { | ||
| return json({ error: error.message }, { status: 422 }); | ||
| } | ||
| // Don't forward raw internal error messages (could leak Prisma/engine | ||
| // details). Log server-side and return a generic 500. | ||
| logger.error("Failed to create session-stream waitpoint", { error }); | ||
| return json({ error: "Something went wrong" }, { status: 500 }); | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| export { action, loader }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { | ||
| CloseSessionRequestBody, | ||
| type RetrieveSessionResponseBody, | ||
| } from "@trigger.dev/core/v3"; | ||
| import { z } from "zod"; | ||
| import { prisma } from "~/db.server"; | ||
| import { | ||
| resolveSessionByIdOrExternalId, | ||
| serializeSession, | ||
| } from "~/services/realtime/sessions.server"; | ||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| session: z.string(), | ||
| }); | ||
|
|
||
| const { action, loader } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| body: CloseSessionRequestBody, | ||
| maxContentLength: 1024, | ||
| method: "POST", | ||
| allowJWT: true, | ||
| corsStrategy: "all", | ||
| authorization: { | ||
| action: "admin", | ||
| resource: (params) => ({ sessions: params.session }), | ||
| superScopes: ["admin:sessions", "admin:all", "admin"], | ||
| }, | ||
| }, | ||
| async ({ authentication, params, body }) => { | ||
| const existing = await resolveSessionByIdOrExternalId( | ||
| prisma, | ||
| authentication.environment.id, | ||
| params.session | ||
| ); | ||
|
|
||
| if (!existing) { | ||
| return json({ error: "Session not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| // Idempotent: if already closed, return the current row without clobbering | ||
| // the original closedAt / closedReason. | ||
| if (existing.closedAt) { | ||
| return json<RetrieveSessionResponseBody>(serializeSession(existing)); | ||
| } | ||
|
|
||
| // `closedAt: null` on the where clause makes the update conditional at | ||
| // the DB level. Two concurrent closes race through the earlier read, | ||
| // but only one can win this update — the loser hits `count === 0` and | ||
| // falls back to reading the winning row. Closedness is write-once. | ||
| const { count } = await prisma.session.updateMany({ | ||
| where: { id: existing.id, closedAt: null }, | ||
| data: { | ||
| closedAt: new Date(), | ||
| closedReason: body.reason ?? null, | ||
| }, | ||
| }); | ||
|
|
||
| if (count === 0) { | ||
| const final = await prisma.session.findFirst({ where: { id: existing.id } }); | ||
| if (!final) return json({ error: "Session not found" }, { status: 404 }); | ||
| return json<RetrieveSessionResponseBody>(serializeSession(final)); | ||
| } | ||
|
|
||
| const updated = await prisma.session.findFirst({ where: { id: existing.id } }); | ||
| if (!updated) return json({ error: "Session not found" }, { status: 404 }); | ||
| return json<RetrieveSessionResponseBody>(serializeSession(updated)); | ||
| } | ||
| ); | ||
|
|
||
| export { action, loader }; | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.