From fee525442d6b09bd4936cdafba938c3a30bc2cae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Fri, 3 Jul 2026 17:49:35 +0200 Subject: [PATCH] fix(cloud-agent-sdk): upgrade read-only CLI sessions to live when the CLI reports them active resolveSession decides remote vs read-only from activeSessions.list, which reflects CLI heartbeats and is eventually consistent: after enabling remote on the CLI it takes a websocket connect plus a heartbeat round-trip (~12s measured locally) before the session appears active. Opening the session in the app inside that window resolved it read-only, which mounted the snapshot-only historical transport and never re-evaluated - the view stayed frozen on whatever the snapshot contained (for a just-started /local-review-uncommitted run, only the first message). When a session resolves read-only and a user-web connection is available, keep the connection retained and watch sessions.list/sessions.heartbeat; as soon as the CLI reports the session active, re-resolve and swap in the live transport. This also covers resolveSession's silent failure paths (transient tRPC errors map to read-only), which previously froze the screen the same way. --- .../cloud-agent-sdk/session-routing.test.ts | 123 ++++++++++++++++++ apps/web/src/lib/cloud-agent-sdk/session.ts | 57 +++++++- 2 files changed, 173 insertions(+), 7 deletions(-) diff --git a/apps/web/src/lib/cloud-agent-sdk/session-routing.test.ts b/apps/web/src/lib/cloud-agent-sdk/session-routing.test.ts index ede30aaab7..98165f355b 100644 --- a/apps/web/src/lib/cloud-agent-sdk/session-routing.test.ts +++ b/apps/web/src/lib/cloud-agent-sdk/session-routing.test.ts @@ -201,6 +201,129 @@ describe('session transport routing', () => { }); }); + describe('read-only session upgrade', () => { + function makeFakeUserWebConnection() { + const systemListeners = new Set<(event: { event: string; data: unknown }) => void>(); + const releaseRetain = jest.fn(); + const offSystemEvent = jest.fn(); + const subscribeRelease = jest.fn(); + return { + connection: { + retain: jest.fn(() => releaseRetain), + connect: jest.fn(), + disconnect: jest.fn(), + destroy: jest.fn(), + subscribeToCliSession: jest.fn(() => subscribeRelease), + sendCommand: jest.fn(() => Promise.resolve()), + onCliEvent: jest.fn(() => jest.fn()), + onSystemEvent: jest.fn((listener: (event: { event: string; data: unknown }) => void) => { + systemListeners.add(listener); + return () => { + offSystemEvent(); + systemListeners.delete(listener); + }; + }), + onReconnect: jest.fn(() => jest.fn()), + onSessionEvent: jest.fn(() => jest.fn()), + }, + emitSystemEvent(event: string, data: unknown) { + for (const listener of systemListeners) listener({ event, data }); + }, + releaseRetain, + offSystemEvent, + }; + } + + it('re-resolves to remote when a CLI heartbeat reports the session as active', async () => { + const snapshot = makeSnapshot({ id: SES_ID }, [ + { + info: stubUserMessage({ id: 'msg-1', sessionID: SES_ID }), + parts: [ + stubTextPart({ id: 'part-1', messageID: 'msg-1', sessionID: SES_ID, text: 'hi' }), + ], + }, + ]); + + const resolveSession = jest + .fn, [unknown]>() + .mockResolvedValueOnce({ type: 'read-only', kiloSessionId: kiloId(SES_ID) }) + .mockResolvedValue({ type: 'remote', kiloSessionId: kiloId(SES_ID) }); + + const fake = makeFakeUserWebConnection(); + + const session = createCloudAgentSession({ + kiloSessionId: kiloId(SES_ID), + resolveSession, + transport: { + fetchSnapshot: jest.fn(() => Promise.resolve(snapshot)), + userWebConnection: fake.connection, + }, + }); + + session.connect(); + await Promise.resolve(); // resolveSession resolves + await Promise.resolve(); // fetchSnapshot resolves + + expect(resolveSession).toHaveBeenCalledTimes(1); + expect(fake.connection.retain).toHaveBeenCalledTimes(1); + expect(session.storage.getMessageIds()).toContain('msg-1'); + + // A heartbeat for a different session must not trigger a re-resolve. + fake.emitSystemEvent('sessions.heartbeat', { + connectionId: 'conn-1', + sessions: [{ id: 'ses-other', status: 'busy', title: 'Other' }], + }); + expect(resolveSession).toHaveBeenCalledTimes(1); + + // The watched session shows up in a heartbeat → upgrade to live. + fake.emitSystemEvent('sessions.heartbeat', { + connectionId: 'conn-1', + sessions: [{ id: SES_ID, status: 'busy', title: 'Review' }], + }); + await Promise.resolve(); // second resolveSession resolves + + expect(resolveSession).toHaveBeenCalledTimes(2); + expect(fake.connection.subscribeToCliSession).toHaveBeenCalledWith(SES_ID); + // Watcher is disarmed once the upgrade kicks off. + expect(fake.offSystemEvent).toHaveBeenCalledTimes(1); + expect(fake.releaseRetain).toHaveBeenCalledTimes(1); + + session.destroy(); + }); + + it('disarms the watcher on destroy without re-resolving', async () => { + const resolveSession = jest.fn( + (): Promise => + Promise.resolve({ type: 'read-only', kiloSessionId: kiloId(SES_ID) }) + ); + + const fake = makeFakeUserWebConnection(); + + const session = createCloudAgentSession({ + kiloSessionId: kiloId(SES_ID), + resolveSession, + transport: { + fetchSnapshot: jest.fn(() => Promise.resolve(makeSnapshot({ id: SES_ID }, []))), + userWebConnection: fake.connection, + }, + }); + + session.connect(); + await Promise.resolve(); + await Promise.resolve(); + + session.destroy(); + expect(fake.offSystemEvent).toHaveBeenCalledTimes(1); + expect(fake.releaseRetain).toHaveBeenCalledTimes(1); + + fake.emitSystemEvent('sessions.heartbeat', { + connectionId: 'conn-1', + sessions: [{ id: SES_ID, status: 'busy', title: 'Review' }], + }); + expect(resolveSession).toHaveBeenCalledTimes(1); + }); + }); + // NOTE: The old "completed Cloud Agent session" case (cloudAgentSessionId present // but isLive=false) no longer exists. With the discriminated union, the resolver // decides the session type. A completed cloud agent session is resolved as diff --git a/apps/web/src/lib/cloud-agent-sdk/session.ts b/apps/web/src/lib/cloud-agent-sdk/session.ts index 4c75714f80..60d06703f3 100644 --- a/apps/web/src/lib/cloud-agent-sdk/session.ts +++ b/apps/web/src/lib/cloud-agent-sdk/session.ts @@ -11,6 +11,7 @@ import type { Images } from '@/lib/images-schema'; import type { NormalizedEvent } from './normalizer'; import type { SuggestionAction } from './types'; import { createChatProcessor } from './chat-processor'; +import { heartbeatDataSchema, sessionsListDataSchema } from './schemas'; import { createServiceState } from './service-state'; import type { ServiceState } from './service-state'; import { createCloudAgentTransport } from './cloud-agent-transport'; @@ -173,6 +174,46 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes let transport: Transport | null = null; let connectGeneration = 0; + let disarmUpgradeWatcher: (() => void) | null = null; + + // A session resolves to 'read-only' when the CLI hasn't (yet) reported it as + // active — but that signal is eventually consistent: enabling remote on the + // CLI takes a connect + heartbeat round-trip to register. Without this + // watcher, opening the session inside that window pins it to the static + // historical snapshot forever. Watch the user-web connection for the session + // showing up in a CLI heartbeat/list and re-resolve to upgrade to live. + function armUpgradeWatcher(): void { + const connection = config.transport.userWebConnection; + if (!connection) return; + + // Keep the socket alive while watching — nothing else retains it for a + // read-only session, and without it no heartbeats arrive. + const release = connection.retain?.(); + const off = connection.onSystemEvent(({ event, data }) => { + if (event !== 'sessions.list' && event !== 'sessions.heartbeat') return; + const schema = event === 'sessions.list' ? sessionsListDataSchema : heartbeatDataSchema; + const parsed = schema.safeParse(data); + if (!parsed.success) return; + if (!parsed.data.sessions.some(session => session.id === config.kiloSessionId)) return; + connectInternal(); + }); + disarmUpgradeWatcher = () => { + off(); + release?.(); + }; + } + + function connectInternal(): void { + disarmUpgradeWatcher?.(); + disarmUpgradeWatcher = null; + if (transport) { + transport.destroy(); + transport = null; + } + connectGeneration += 1; + serviceState.setActivity({ type: 'connecting' }); + void resolveAndConnect(connectGeneration); + } const sink: TransportSink = { onChatEvent(event) { @@ -291,6 +332,10 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes transport = factory(sink); transport.connect(); + + if (resolved.type === 'read-only') { + armUpgradeWatcher(); + } } return { @@ -367,15 +412,11 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes return transport?.interrupt !== undefined; }, connect() { - if (transport) { - transport.destroy(); - transport = null; - } - connectGeneration += 1; - serviceState.setActivity({ type: 'connecting' }); - void resolveAndConnect(connectGeneration); + connectInternal(); }, disconnect() { + disarmUpgradeWatcher?.(); + disarmUpgradeWatcher = null; connectGeneration += 1; if (transport) { transport.disconnect(); @@ -383,6 +424,8 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes } }, destroy() { + disarmUpgradeWatcher?.(); + disarmUpgradeWatcher = null; connectGeneration += 1; if (transport) { transport.destroy();