From 2a81e84e394d55958582c452620d4226f2ef3cdc Mon Sep 17 00:00:00 2001 From: Remon Oldenbeuving Date: Tue, 30 Jun 2026 12:35:18 +0200 Subject: [PATCH] fix(session-ingest): shrink cli_sessions_v2 status row lock Reduce per-session row lock hold time on cli_sessions_v2 by removing the multi-step JS transaction around status metadata. Status transitions now run as a single SELECT ... FOR UPDATE + UPDATE ... RETURNING CTE, no-op status writes no longer stamp timestamps or lock the row, and parent and non-status metadata updates no longer execute while the status lock is held. --- .../session-ingest/src/queue-consumer.test.ts | 158 ++++++---- services/session-ingest/src/queue-consumer.ts | 296 ++++++++++-------- .../src/types/user-connection-protocol.ts | 1 + 3 files changed, 261 insertions(+), 194 deletions(-) diff --git a/services/session-ingest/src/queue-consumer.test.ts b/services/session-ingest/src/queue-consumer.test.ts index 7fc80c8fcb..76290c306e 100644 --- a/services/session-ingest/src/queue-consumer.test.ts +++ b/services/session-ingest/src/queue-consumer.test.ts @@ -53,6 +53,26 @@ import { const encoder = new TextEncoder(); +function mockDbWithSessionGuard(overrides: Record = {}) { + const limit = vi.fn(async () => [{ session_id: 'ses_exists' }]); + const where = vi.fn(() => ({ limit })); + const from = vi.fn(() => ({ where })); + const select = vi.fn(() => ({ from })); + return { select, ...overrides }; +} + +function createUpdateReturningMock(rows: unknown[] = []) { + const updateQuery = { + set: vi.fn(() => updateQuery), + where: vi.fn(() => updateQuery), + returning: vi.fn(async () => rows), + }; + return { + update: vi.fn(() => updateQuery), + updateQuery, + }; +} + function feedAll(extractor: ReturnType, json: string) { extractor.tokenizer.write(encoder.encode(json)); extractor.tokenizer.end(); @@ -382,12 +402,8 @@ describe('queue', () => { ); vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never); - const transaction = vi.fn(async () => null); - const limit = vi.fn(async () => [{ session_id: 'ses_split' }]); - const where = vi.fn(() => ({ limit })); - const from = vi.fn(() => ({ where })); - const select = vi.fn(() => ({ from })); - vi.mocked(getWorkerDb).mockReturnValue({ select, transaction } as never); + const { update } = createUpdateReturningMock(); + vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ update }) as never); const items = Array.from({ length: 129 }, (_, i) => ({ type: 'message', @@ -445,7 +461,7 @@ describe('queue', () => { 1, undefined ); - expect(transaction).toHaveBeenCalledTimes(1); + expect(update).toHaveBeenCalledTimes(1); expect(deleteObject).toHaveBeenCalledWith('staging/split'); expect(ack).toHaveBeenCalledTimes(1); expect(retry).not.toHaveBeenCalled(); @@ -563,12 +579,8 @@ describe('queue', () => { const ingest = vi.fn(async () => ({ changes: [] })); vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never); - const transaction = vi.fn(async () => null); - const limit = vi.fn(async () => [{ session_id: 'ses_malformed' }]); - const where = vi.fn(() => ({ limit })); - const from = vi.fn(() => ({ where })); - const select = vi.fn(() => ({ from })); - vi.mocked(getWorkerDb).mockReturnValue({ select, transaction } as never); + const { update } = createUpdateReturningMock(); + vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ update }) as never); const body = '{"data":[{"type":"message","data":{"id":"msg_1"}},broken'; const deleteObject = vi.fn(async () => undefined); @@ -604,7 +616,7 @@ describe('queue', () => { ); expect(ingest).not.toHaveBeenCalled(); - expect(transaction).not.toHaveBeenCalled(); + expect(update).not.toHaveBeenCalled(); expect(retry).toHaveBeenCalledWith({ delaySeconds: QUEUE_RETRY_DELAY_SECONDS }); expect(ack).not.toHaveBeenCalled(); expect(deleteObject).not.toHaveBeenCalled(); @@ -623,13 +635,9 @@ describe('queue', () => { }); vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never); - // db.select powers the session-exists guard; db.transaction is the metadata flush. - const transaction = vi.fn(async () => null); - const limit = vi.fn(async () => [{ session_id: 'ses_partial' }]); - const where = vi.fn(() => ({ limit })); - const from = vi.fn(() => ({ where })); - const select = vi.fn(() => ({ from })); - vi.mocked(getWorkerDb).mockReturnValue({ select, transaction } as never); + // db.select powers the session-exists guard; db.update is the metadata flush. + const { update } = createUpdateReturningMock(); + vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ update }) as never); // 129 items -> chunk 1 = 128 items (flush succeeds), chunk 2 = 1 item (flush throws). const items = Array.from({ length: 129 }, (_, i) => ({ @@ -673,7 +681,7 @@ describe('queue', () => { const firstChunkItems = (ingest.mock.calls[0] as unknown[])[0]; expect(firstChunkItems).toHaveLength(128); // Its metadata change was flushed to Postgres despite the later failure. - expect(transaction).toHaveBeenCalledTimes(1); + expect(update).toHaveBeenCalledTimes(1); // The message is retried and the staging object is preserved for reprocessing. expect(retry).toHaveBeenCalledWith({ delaySeconds: QUEUE_RETRY_DELAY_SECONDS }); expect(ack).not.toHaveBeenCalled(); @@ -682,28 +690,22 @@ describe('queue', () => { }); describe('computeSessionMetadataUpdates', () => { - const fixedNow = () => '2026-05-05T00:00:00.000Z'; - it('normalizes gitUrl to the canonical form before persisting', () => { const updates = computeSessionMetadataUpdates( - new Map([['gitUrl', 'https://GitHub.com/ACME/Widgets.git']]), - fixedNow + new Map([['gitUrl', 'https://GitHub.com/ACME/Widgets.git']]) ); expect(updates.git_url).toBe('https://github.com/acme/widgets'); }); it('collapses scp-style and ssh:// URLs to the same normalized form as https', () => { const fromScp = computeSessionMetadataUpdates( - new Map([['gitUrl', 'git@github.com:acme/widgets.git']]), - fixedNow + new Map([['gitUrl', 'git@github.com:acme/widgets.git']]) ); const fromSsh = computeSessionMetadataUpdates( - new Map([['gitUrl', 'ssh://git@github.com/acme/widgets.git']]), - fixedNow + new Map([['gitUrl', 'ssh://git@github.com/acme/widgets.git']]) ); const fromHttps = computeSessionMetadataUpdates( - new Map([['gitUrl', 'https://github.com/acme/widgets']]), - fixedNow + new Map([['gitUrl', 'https://github.com/acme/widgets']]) ); expect(fromScp.git_url).toBe('https://github.com/acme/widgets'); expect(fromSsh.git_url).toBe(fromScp.git_url); @@ -711,7 +713,7 @@ describe('computeSessionMetadataUpdates', () => { }); it('writes null git_url when the ingest cleared the field', () => { - const updates = computeSessionMetadataUpdates(new Map([['gitUrl', null]]), fixedNow); + const updates = computeSessionMetadataUpdates(new Map([['gitUrl', null]])); expect(updates.git_url).toBeNull(); }); @@ -720,22 +722,21 @@ describe('computeSessionMetadataUpdates', () => { new Map([ ['gitBranch', 'feature/x'], ['title', 'hello'], - ]), - fixedNow + ]) ); expect('git_url' in updates).toBe(false); expect(updates.git_branch).toBe('feature/x'); expect(updates.title).toBe('hello'); }); - it('stamps status_updated_at when status changes', () => { - const updates = computeSessionMetadataUpdates(new Map([['status', 'running']]), fixedNow); - expect(updates.status).toBe('running'); - expect(updates.status_updated_at).toBe('2026-05-05T00:00:00.000Z'); + it('does not include status fields in the generic metadata update', () => { + const updates = computeSessionMetadataUpdates(new Map([['status', 'running']])); + expect('status' in updates).toBe(false); + expect('status_updated_at' in updates).toBe(false); }); it('ignores a null "platform" change (creation value stays sticky)', () => { - const updates = computeSessionMetadataUpdates(new Map([['platform', null]]), fixedNow); + const updates = computeSessionMetadataUpdates(new Map([['platform', null]])); expect('created_on_platform' in updates).toBe(false); }); }); @@ -755,33 +756,10 @@ describe('queue status notifications', () => { parent_session_id: null, status: 'idle', status_updated_at: '2026-05-05T00:00:01.000Z', + previous_status: 'busy', }; - const selectResults: unknown[][] = [ - [{ session_id: persistedSession.session_id, status: 'idle' }], - [{ status: 'busy' }], - [persistedSession], - ]; - const selectResult = vi.fn(async () => selectResults.shift() ?? []); - const select = { - from: vi.fn(() => select), - where: vi.fn(() => select), - limit: vi.fn(() => select), - for: vi.fn(() => select), - then: vi.fn((resolve: (value: unknown) => unknown) => resolve(selectResult())), - }; - const update = { - set: vi.fn(() => update), - where: vi.fn(() => update), - then: vi.fn((resolve: (value: undefined) => unknown) => resolve(undefined)), - }; - const dbRef: Record = {}; - const db = { - select: vi.fn(() => select), - update: vi.fn(() => update), - transaction: vi.fn(async (fn: (tx: unknown) => Promise) => fn(dbRef)), - } as unknown as ReturnType; - Object.assign(dbRef, db); - vi.mocked(getWorkerDb).mockReturnValue(db); + const execute = vi.fn(async () => ({ rows: [persistedSession] })); + vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ execute }) as never); vi.mocked(getSessionIngestDO).mockReturnValue({ ingest: vi.fn(async () => ({ changes: [{ name: 'status', value: 'idle' }] })), } as never); @@ -825,6 +803,7 @@ describe('queue status notifications', () => { await queue(batch, env, ctx); expect(ack).toHaveBeenCalledTimes(1); + expect(execute).toHaveBeenCalledTimes(1); expect(notifyUserSessionEvent).toHaveBeenCalledWith( env, 'usr_test', @@ -835,4 +814,51 @@ describe('queue status notifications', () => { ctx ); }); + + it('does not notify or stamp timestamps when the locked status is unchanged', async () => { + vi.mocked(notifyUserSessionEvent).mockClear(); + const execute = vi.fn(async () => ({ rows: [] })); + vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ execute }) as never); + vi.mocked(getSessionIngestDO).mockReturnValue({ + ingest: vi.fn(async () => ({ changes: [{ name: 'status', value: 'idle' }] })), + } as never); + + const body = JSON.stringify({ data: [{ type: 'session_status', data: { status: 'idle' } }] }); + const env = { + HYPERDRIVE: { connectionString: 'postgres://test' }, + SESSION_INGEST_R2: { + get: vi.fn(async () => new Response(body)), + delete: vi.fn(async () => undefined), + put: vi.fn(async () => undefined), + }, + } as never; + const ack = vi.fn(); + const retry = vi.fn(); + const ctx = { waitUntil: vi.fn() } as unknown as ExecutionContext; + + await queue( + { + messages: [ + { + body: { + r2Key: 'ingest/status-noop', + kiloUserId: 'usr_test', + sessionId: 'ses_12345678901234567890123456', + ingestVersion: 1, + ingestedAt: 1, + }, + ack, + retry, + }, + ], + } as never, + env, + ctx + ); + + expect(ack).toHaveBeenCalledTimes(1); + expect(retry).not.toHaveBeenCalled(); + expect(execute).toHaveBeenCalledTimes(1); + expect(notifyUserSessionEvent).not.toHaveBeenCalled(); + }); }); diff --git a/services/session-ingest/src/queue-consumer.ts b/services/session-ingest/src/queue-consumer.ts index a56e8d0273..2b493db1a1 100644 --- a/services/session-ingest/src/queue-consumer.ts +++ b/services/session-ingest/src/queue-consumer.ts @@ -9,8 +9,12 @@ import { getItemIdentity } from './util/compaction'; import { MAX_INGEST_ITEM_BYTES, MAX_SINGLE_ITEM_BYTES } from './util/ingest-limits'; import { getSessionIngestDO } from './dos/SessionIngestDO'; import { withDORetry, normalizeGitUrl } from '@kilocode/worker-utils'; -import { mapSessionEventRow, notifyUserSessionEvent } from './session-events'; -import { SessionStatusSchema } from './types/user-connection-protocol'; +import { + mapSessionEventRow, + notifyUserSessionEvent, + type SessionEventDbRow, +} from './session-events'; +import { SessionStatusSchema, type SessionStatus } from './types/user-connection-protocol'; export interface IngestQueueMessage { r2Key: string; @@ -398,27 +402,33 @@ function createIngestChunker( type SessionMetadataUpdates = Partial< Pick< typeof cli_sessions_v2.$inferInsert, - | 'title' - | 'created_on_platform' - | 'organization_id' - | 'git_url' - | 'git_branch' - | 'status' - | 'status_updated_at' + 'title' | 'created_on_platform' | 'organization_id' | 'git_url' | 'git_branch' > >; +const sessionEventReturningColumns = { + session_id: cli_sessions_v2.session_id, + created_at: cli_sessions_v2.created_at, + updated_at: cli_sessions_v2.updated_at, + title: cli_sessions_v2.title, + created_on_platform: cli_sessions_v2.created_on_platform, + organization_id: cli_sessions_v2.organization_id, + git_url: cli_sessions_v2.git_url, + git_branch: cli_sessions_v2.git_branch, + parent_session_id: cli_sessions_v2.parent_session_id, + status: cli_sessions_v2.status, + status_updated_at: cli_sessions_v2.status_updated_at, +}; + /** * Build the `cli_sessions_v2` partial update from a set of metadata changes. * * `git_url` is passed through `normalizeGitUrl` on write so that the * `github_branch_pull_requests` cache (keyed on the canonical form) can - * match new sessions without per-read normalization. Status bumps carry - * `status_updated_at = now()`. + * match new sessions without per-read normalization. */ export function computeSessionMetadataUpdates( - mergedChanges: Map, - now: () => string = () => new Date().toISOString() + mergedChanges: Map ): SessionMetadataUpdates { const updates: SessionMetadataUpdates = {}; @@ -439,14 +449,136 @@ export function computeSessionMetadataUpdates( if (mergedChanges.has('gitBranch')) { updates.git_branch = mergedChanges.get('gitBranch') ?? null; } - if (mergedChanges.has('status')) { - updates.status = mergedChanges.get('status') ?? null; - updates.status_updated_at = now(); - } return updates; } +async function updateSessionMetadata( + db: ReturnType, + kiloUserId: string, + sessionId: string, + updates: SessionMetadataUpdates +): Promise { + if (Object.keys(updates).length === 0) return null; + + const [row] = await db + .update(cli_sessions_v2) + .set(updates) + .where( + and(eq(cli_sessions_v2.session_id, sessionId), eq(cli_sessions_v2.kilo_user_id, kiloUserId)) + ) + .returning(sessionEventReturningColumns); + return row ?? null; +} + +async function updateSessionParent( + db: ReturnType, + kiloUserId: string, + sessionId: string, + parentSessionId: string | null | undefined +): Promise { + if (parentSessionId === undefined) return null; + + if (parentSessionId && parentSessionId !== sessionId) { + const parentRows = await db + .select({ session_id: cli_sessions_v2.session_id }) + .from(cli_sessions_v2) + .where( + and( + eq(cli_sessions_v2.session_id, parentSessionId), + eq(cli_sessions_v2.kilo_user_id, kiloUserId) + ) + ) + .limit(1); + + if (!parentRows[0]) return null; + + const [row] = await db + .update(cli_sessions_v2) + .set({ parent_session_id: parentSessionId }) + .where( + and( + eq(cli_sessions_v2.session_id, sessionId), + eq(cli_sessions_v2.kilo_user_id, kiloUserId), + sql`${cli_sessions_v2.parent_session_id} IS DISTINCT FROM ${parentSessionId}` + ) + ) + .returning(sessionEventReturningColumns); + return row ?? null; + } + + if (parentSessionId === null) { + const [row] = await db + .update(cli_sessions_v2) + .set({ parent_session_id: null }) + .where( + and( + eq(cli_sessions_v2.session_id, sessionId), + eq(cli_sessions_v2.kilo_user_id, kiloUserId), + sql`${cli_sessions_v2.parent_session_id} IS DISTINCT FROM ${parentSessionId}` + ) + ) + .returning(sessionEventReturningColumns); + return row ?? null; + } + + return null; +} + +type StatusTransitionRow = SessionEventDbRow & { + previous_status: string | null; +}; + +async function updateSessionStatus( + db: ReturnType, + kiloUserId: string, + sessionId: string, + status: string | null | undefined +): Promise<{ session: SessionEventDbRow; previousStatus: SessionStatus | null } | null> { + if (status === undefined) return null; + + const { rows } = await db.execute(sql` + WITH locked AS ( + SELECT status AS previous_status + FROM cli_sessions_v2 + WHERE session_id = ${sessionId} + AND kilo_user_id = ${kiloUserId} + FOR UPDATE + ), updated AS ( + UPDATE cli_sessions_v2 + SET + status = ${status}, + status_updated_at = now(), + updated_at = now() + FROM locked + WHERE cli_sessions_v2.session_id = ${sessionId} + AND cli_sessions_v2.kilo_user_id = ${kiloUserId} + AND locked.previous_status IS DISTINCT FROM ${status} + RETURNING + cli_sessions_v2.session_id, + cli_sessions_v2.created_at, + cli_sessions_v2.updated_at, + cli_sessions_v2.title, + cli_sessions_v2.created_on_platform, + cli_sessions_v2.organization_id, + cli_sessions_v2.git_url, + cli_sessions_v2.git_branch, + cli_sessions_v2.parent_session_id, + cli_sessions_v2.status, + cli_sessions_v2.status_updated_at, + (SELECT previous_status FROM locked) AS previous_status + ) + SELECT * FROM updated + `); + const row = rows[0]; + if (!row) return null; + + return { + session: row, + previousStatus: SessionStatusSchema.nullable().parse(row.previous_status), + }; +} + async function applyMetadataChanges( env: Env, kiloUserId: string, @@ -462,119 +594,27 @@ async function applyMetadataChanges( const parentSessionId = mergedChanges.has('parentId') ? (mergedChanges.get('parentId') ?? null) : undefined; - const changedNonStatus = - mergedChanges.has('title') || - mergedChanges.has('platform') || - mergedChanges.has('orgId') || - mergedChanges.has('gitUrl') || - mergedChanges.has('gitBranch') || - parentSessionId !== undefined; - - const notification = await db.transaction(async tx => { - const statusChange = - status === undefined - ? { changed: false, previousStatus: null } - : await (async () => { - const [statusRow] = await tx - .select({ status: cli_sessions_v2.status }) - .from(cli_sessions_v2) - .where( - and( - eq(cli_sessions_v2.session_id, sessionId), - eq(cli_sessions_v2.kilo_user_id, kiloUserId) - ) - ) - .limit(1) - .for('update'); - if (!statusRow) return null; - const previousStatus = SessionStatusSchema.nullable().parse(statusRow.status); - return { changed: status !== previousStatus, previousStatus }; - })(); - - if (!statusChange) return null; - - if (Object.keys(updates).length > 0) { - await tx - .update(cli_sessions_v2) - .set(updates) - .where( - and( - eq(cli_sessions_v2.session_id, sessionId), - eq(cli_sessions_v2.kilo_user_id, kiloUserId) - ) - ); - } - - if (parentSessionId !== undefined) { - if (parentSessionId && parentSessionId !== sessionId) { - const parentRows = await tx - .select({ session_id: cli_sessions_v2.session_id }) - .from(cli_sessions_v2) - .where( - and( - eq(cli_sessions_v2.session_id, parentSessionId), - eq(cli_sessions_v2.kilo_user_id, kiloUserId) - ) - ) - .limit(1); - - if (parentRows[0]) { - await tx - .update(cli_sessions_v2) - .set({ parent_session_id: parentSessionId }) - .where( - and( - eq(cli_sessions_v2.session_id, sessionId), - eq(cli_sessions_v2.kilo_user_id, kiloUserId), - sql`${cli_sessions_v2.parent_session_id} IS DISTINCT FROM ${parentSessionId}` - ) - ); - } - } else if (parentSessionId === null) { - await tx - .update(cli_sessions_v2) - .set({ parent_session_id: null }) - .where( - and( - eq(cli_sessions_v2.session_id, sessionId), - eq(cli_sessions_v2.kilo_user_id, kiloUserId), - sql`${cli_sessions_v2.parent_session_id} IS DISTINCT FROM ${parentSessionId}` - ) - ); - } - } - - if (!changedNonStatus && !statusChange.changed) return null; - - const [persistedRow] = await tx - .select({ - session_id: cli_sessions_v2.session_id, - created_at: cli_sessions_v2.created_at, - updated_at: cli_sessions_v2.updated_at, - title: cli_sessions_v2.title, - created_on_platform: cli_sessions_v2.created_on_platform, - organization_id: cli_sessions_v2.organization_id, - git_url: cli_sessions_v2.git_url, - git_branch: cli_sessions_v2.git_branch, - parent_session_id: cli_sessions_v2.parent_session_id, - status: cli_sessions_v2.status, - status_updated_at: cli_sessions_v2.status_updated_at, - }) - .from(cli_sessions_v2) - .where( - and(eq(cli_sessions_v2.session_id, sessionId), eq(cli_sessions_v2.kilo_user_id, kiloUserId)) - ) - .limit(1); + let changedNonStatus = false; + let notificationRow = await updateSessionMetadata(db, kiloUserId, sessionId, updates); + if (notificationRow) changedNonStatus = true; + + const parentUpdateRow = await updateSessionParent(db, kiloUserId, sessionId, parentSessionId); + if (parentUpdateRow) { + changedNonStatus = true; + notificationRow = parentUpdateRow; + } - if (!persistedRow) return null; + const statusChange = await updateSessionStatus(db, kiloUserId, sessionId, status); + if (statusChange) notificationRow = statusChange.session; - return { - changedNonStatus, - changedStatus: statusChange.changed, - previousStatus: statusChange.previousStatus, - session: mapSessionEventRow(persistedRow), - }; - }); + const notification = notificationRow + ? { + changedNonStatus, + changedStatus: !!statusChange, + previousStatus: statusChange?.previousStatus ?? null, + session: mapSessionEventRow(notificationRow), + } + : null; if (!notification) return; if (notification.changedNonStatus) { diff --git a/services/session-ingest/src/types/user-connection-protocol.ts b/services/session-ingest/src/types/user-connection-protocol.ts index 702066f961..015fe5b24c 100644 --- a/services/session-ingest/src/types/user-connection-protocol.ts +++ b/services/session-ingest/src/types/user-connection-protocol.ts @@ -185,6 +185,7 @@ export type CLIOutboundMessage = z.infer; export type CLIInboundMessage = z.infer; export type WebOutboundMessage = z.infer; export type WebInboundMessage = z.infer; +export type SessionStatus = z.infer; export type SessionEventV2Row = z.infer; export type SessionRowEventPayload = z.infer; export type SessionStatusUpdatedPayload = z.infer;