Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions apps/web/src/lib/cloud-agent-sdk/session-routing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,129 @@ describe('session transport routing', () => {
});
});

describe('read-only session upgrade', () => {
function makeFakeUserWebConnection() {
const systemListeners = new Set<(event: { event: string; data: unknown }) => void>();
const releaseRetain = jest.fn();
const offSystemEvent = jest.fn();
const subscribeRelease = jest.fn();
return {
connection: {
retain: jest.fn(() => releaseRetain),
connect: jest.fn(),
disconnect: jest.fn(),
destroy: jest.fn(),
subscribeToCliSession: jest.fn(() => subscribeRelease),
sendCommand: jest.fn(() => Promise.resolve()),
onCliEvent: jest.fn(() => jest.fn()),
onSystemEvent: jest.fn((listener: (event: { event: string; data: unknown }) => void) => {
systemListeners.add(listener);
return () => {
offSystemEvent();
systemListeners.delete(listener);
};
}),
onReconnect: jest.fn(() => jest.fn()),
onSessionEvent: jest.fn(() => jest.fn()),
},
emitSystemEvent(event: string, data: unknown) {
for (const listener of systemListeners) listener({ event, data });
},
releaseRetain,
offSystemEvent,
};
}

it('re-resolves to remote when a CLI heartbeat reports the session as active', async () => {
const snapshot = makeSnapshot({ id: SES_ID }, [
{
info: stubUserMessage({ id: 'msg-1', sessionID: SES_ID }),
parts: [
stubTextPart({ id: 'part-1', messageID: 'msg-1', sessionID: SES_ID, text: 'hi' }),
],
},
]);

const resolveSession = jest
.fn<Promise<ResolvedSession>, [unknown]>()
.mockResolvedValueOnce({ type: 'read-only', kiloSessionId: kiloId(SES_ID) })
.mockResolvedValue({ type: 'remote', kiloSessionId: kiloId(SES_ID) });

const fake = makeFakeUserWebConnection();

const session = createCloudAgentSession({
kiloSessionId: kiloId(SES_ID),
resolveSession,
transport: {
fetchSnapshot: jest.fn(() => Promise.resolve(snapshot)),
userWebConnection: fake.connection,
},
});

session.connect();
await Promise.resolve(); // resolveSession resolves
await Promise.resolve(); // fetchSnapshot resolves

expect(resolveSession).toHaveBeenCalledTimes(1);
expect(fake.connection.retain).toHaveBeenCalledTimes(1);
expect(session.storage.getMessageIds()).toContain('msg-1');

// A heartbeat for a different session must not trigger a re-resolve.
fake.emitSystemEvent('sessions.heartbeat', {
connectionId: 'conn-1',
sessions: [{ id: 'ses-other', status: 'busy', title: 'Other' }],
});
expect(resolveSession).toHaveBeenCalledTimes(1);

// The watched session shows up in a heartbeat → upgrade to live.
fake.emitSystemEvent('sessions.heartbeat', {
connectionId: 'conn-1',
sessions: [{ id: SES_ID, status: 'busy', title: 'Review' }],
});
await Promise.resolve(); // second resolveSession resolves

expect(resolveSession).toHaveBeenCalledTimes(2);
expect(fake.connection.subscribeToCliSession).toHaveBeenCalledWith(SES_ID);
// Watcher is disarmed once the upgrade kicks off.
expect(fake.offSystemEvent).toHaveBeenCalledTimes(1);
expect(fake.releaseRetain).toHaveBeenCalledTimes(1);

session.destroy();
});

it('disarms the watcher on destroy without re-resolving', async () => {
const resolveSession = jest.fn(
(): Promise<ResolvedSession> =>
Promise.resolve({ type: 'read-only', kiloSessionId: kiloId(SES_ID) })
);

const fake = makeFakeUserWebConnection();

const session = createCloudAgentSession({
kiloSessionId: kiloId(SES_ID),
resolveSession,
transport: {
fetchSnapshot: jest.fn(() => Promise.resolve(makeSnapshot({ id: SES_ID }, []))),
userWebConnection: fake.connection,
},
});

session.connect();
await Promise.resolve();
await Promise.resolve();

session.destroy();
expect(fake.offSystemEvent).toHaveBeenCalledTimes(1);
expect(fake.releaseRetain).toHaveBeenCalledTimes(1);

fake.emitSystemEvent('sessions.heartbeat', {
connectionId: 'conn-1',
sessions: [{ id: SES_ID, status: 'busy', title: 'Review' }],
});
expect(resolveSession).toHaveBeenCalledTimes(1);
});
});

// NOTE: The old "completed Cloud Agent session" case (cloudAgentSessionId present
// but isLive=false) no longer exists. With the discriminated union, the resolver
// decides the session type. A completed cloud agent session is resolved as
Expand Down
57 changes: 50 additions & 7 deletions apps/web/src/lib/cloud-agent-sdk/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { Images } from '@/lib/images-schema';
import type { NormalizedEvent } from './normalizer';
import type { SuggestionAction } from './types';
import { createChatProcessor } from './chat-processor';
import { heartbeatDataSchema, sessionsListDataSchema } from './schemas';
import { createServiceState } from './service-state';
import type { ServiceState } from './service-state';
import { createCloudAgentTransport } from './cloud-agent-transport';
Expand Down Expand Up @@ -173,6 +174,46 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes

let transport: Transport | null = null;
let connectGeneration = 0;
let disarmUpgradeWatcher: (() => void) | null = null;

// A session resolves to 'read-only' when the CLI hasn't (yet) reported it as
// active — but that signal is eventually consistent: enabling remote on the
// CLI takes a connect + heartbeat round-trip to register. Without this
// watcher, opening the session inside that window pins it to the static
// historical snapshot forever. Watch the user-web connection for the session
// showing up in a CLI heartbeat/list and re-resolve to upgrade to live.
function armUpgradeWatcher(): void {
const connection = config.transport.userWebConnection;
if (!connection) return;

// Keep the socket alive while watching — nothing else retains it for a
// read-only session, and without it no heartbeats arrive.
const release = connection.retain?.();
const off = connection.onSystemEvent(({ event, data }) => {
if (event !== 'sessions.list' && event !== 'sessions.heartbeat') return;
const schema = event === 'sessions.list' ? sessionsListDataSchema : heartbeatDataSchema;
const parsed = schema.safeParse(data);
if (!parsed.success) return;
if (!parsed.data.sessions.some(session => session.id === config.kiloSessionId)) return;
connectInternal();
});
disarmUpgradeWatcher = () => {
off();
release?.();
};
}

function connectInternal(): void {
disarmUpgradeWatcher?.();
disarmUpgradeWatcher = null;
if (transport) {
transport.destroy();
transport = null;
}
connectGeneration += 1;
serviceState.setActivity({ type: 'connecting' });
void resolveAndConnect(connectGeneration);
}

const sink: TransportSink = {
onChatEvent(event) {
Expand Down Expand Up @@ -291,6 +332,10 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes

transport = factory(sink);
transport.connect();

if (resolved.type === 'read-only') {
armUpgradeWatcher();
Comment thread
iscekic marked this conversation as resolved.
}
}

return {
Expand Down Expand Up @@ -367,22 +412,20 @@ function createCloudAgentSession(config: CloudAgentSessionConfig): CloudAgentSes
return transport?.interrupt !== undefined;
},
connect() {
if (transport) {
transport.destroy();
transport = null;
}
connectGeneration += 1;
serviceState.setActivity({ type: 'connecting' });
void resolveAndConnect(connectGeneration);
connectInternal();
},
disconnect() {
disarmUpgradeWatcher?.();
disarmUpgradeWatcher = null;
connectGeneration += 1;
if (transport) {
transport.disconnect();
transport = null;
}
},
destroy() {
disarmUpgradeWatcher?.();
disarmUpgradeWatcher = null;
connectGeneration += 1;
if (transport) {
transport.destroy();
Expand Down