Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions services/cloud-agent-next/src/persistence/CloudAgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1717,28 +1717,74 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
private async scheduleEphemeralSandboxDestroy(delayMs: number): Promise<void> {
const existing = await this.ctx.storage.get<number>(EPHEMERAL_SANDBOX_DESTROY_AFTER_KEY);
if (existing !== undefined) {
logger
.withFields({ sessionId: this.sessionId, destroyAfter: existing })
.info('Ephemeral sandbox destroy already scheduled; re-arming alarm');
await this.scheduleAlarmAtOrBefore(existing);
return;
}
const destroyAfter = Date.now() + delayMs;
logger
.withFields({ sessionId: this.sessionId, delayMs, destroyAfter })
.info('Scheduling ephemeral sandbox destroy');
await this.ctx.storage.put(EPHEMERAL_SANDBOX_DESTROY_AFTER_KEY, destroyAfter);
await this.scheduleAlarmAtOrBefore(destroyAfter);
}

private async destroyEphemeralSandboxIfReady(now: number): Promise<void> {
const destroyAfter = await this.ctx.storage.get<number>(EPHEMERAL_SANDBOX_DESTROY_AFTER_KEY);
if (destroyAfter === undefined || now < destroyAfter) return;
if (destroyAfter === undefined) return;
if (now < destroyAfter) {
logger
.withFields({ sessionId: this.sessionId, now, destroyAfter })
.debug('Ephemeral sandbox destroy not yet due');
return;
}
const metadata = await this.getMetadata();
if (!metadata || !isCodeReviewEphemeralSandboxId(metadata.workspace?.sandboxId)) {
logger
.withFields({
sessionId: this.sessionId,
hasMetadata: metadata !== null,
sandboxId: metadata?.workspace?.sandboxId,
})
.warn('Skipping ephemeral sandbox destroy: metadata missing or sandbox is not ephemeral');
await this.ctx.storage.delete(EPHEMERAL_SANDBOX_DESTROY_AFTER_KEY);
return;
}

if (this.ephemeralSandboxDestroyer) {
await this.ephemeralSandboxDestroyer();
} else {
await createAgentSandbox(this.env, metadata).delete('recovery');
logger
.withFields({
sessionId: this.sessionId,
sandboxId: metadata.workspace?.sandboxId,
now,
destroyAfter,
overdueMs: now - destroyAfter,
usingCustomDestroyer: this.ephemeralSandboxDestroyer !== undefined,
})
.info('Destroying ephemeral sandbox');

try {
if (this.ephemeralSandboxDestroyer) {
await this.ephemeralSandboxDestroyer();
} else {
await createAgentSandbox(this.env, metadata).delete('recovery');
}
} catch (error) {
logger
.withFields({
sessionId: this.sessionId,
sandboxId: metadata.workspace?.sandboxId,
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
})
.error('Ephemeral sandbox destroy failed');
throw error;
}

logger
.withFields({ sessionId: this.sessionId, sandboxId: metadata.workspace?.sandboxId })
.info('Ephemeral sandbox destroyed successfully');
await this.ctx.storage.delete(EPHEMERAL_SANDBOX_DESTROY_AFTER_KEY);
await this.ctx.storage.put(EPHEMERAL_SANDBOX_DESTROYED_AT_KEY, Date.now());
}
Expand Down Expand Up @@ -3162,6 +3208,16 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
const metadata = await this.getMetadata();
if (!metadata) return;
if (!isCodeReviewEphemeralSandboxId(metadata.workspace?.sandboxId)) return;
logger
.withFields({
sessionId: this.sessionId,
sandboxId: metadata.workspace?.sandboxId,
status: params.status,
wrapperRunId: params.wrapperRunId,
})
.info(
'Wrapper terminal event on ephemeral code-review sandbox; forcing stop and scheduling destroy'
);
await this.getWrapperSupervisor().requestPhysicalWrapperStop('terminal-ended', {
kind: 'session',
});
Expand Down
49 changes: 47 additions & 2 deletions services/cloud-agent-next/src/session/wrapper-supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,41 @@ export function createWrapperSupervisor(
return (await listNonTerminalAcceptedMessages(storage, state.wrapperRunId)).length > 0;
}

/**
* The liveness guards below silently stop tracking a wrapper run once they decide there's
* no active work for it. If a message is still sitting non-terminal/accepted somewhere (for
* this run or a stale one), that's the watchdog going dark on a message it should still be
* supervising — surface it, since neither guard branch otherwise leaves a trace.
*/
async function warnIfLivenessGuardOrphansAcceptedMessage(
guard: 'missing_wrapper_connection' | 'no_active_wrapper_work',
state: WrapperRuntimeState
): Promise<void> {
const orphaned = await listNonTerminalAcceptedMessages(storage);
if (orphaned.length === 0) return;
logger
.withFields({
sessionId: getSessionIdForLogs(),
guard,
wrapperRunId: state.wrapperRunId,
wrapperGeneration: state.wrapperGeneration,
wrapperConnectionId: state.wrapperConnectionId,
orphanedMessageIds: orphaned.map(message => message.messageId),
})
.warn(
'Wrapper liveness watchdog stopped tracking while accepted messages remain non-terminal'
);
}

async function getNextWrapperLivenessDeadline(): Promise<number | undefined> {
const state = await getWrapperRuntimeState(storage);
if (!state.wrapperConnectionId) return undefined;
if (!state.wrapperConnectionId) {
await warnIfLivenessGuardOrphansAcceptedMessage('missing_wrapper_connection', state);
return undefined;
}

if (!(await hasActiveWrapperWork(state))) {
await warnIfLivenessGuardOrphansAcceptedMessage('no_active_wrapper_work', state);
const hasLivenessFields =
state.noOutputDeadlineAt !== undefined ||
state.pingDeadlineAt !== undefined ||
Expand All @@ -726,9 +756,13 @@ export function createWrapperSupervisor(
state.noOutputDeadlineAt !== undefined ||
state.pingDeadlineAt !== undefined ||
state.nextPingAt !== undefined;
if (!hasLivenessDeadline || !state.wrapperConnectionId) return false;
if (!hasLivenessDeadline || !state.wrapperConnectionId) {
await warnIfLivenessGuardOrphansAcceptedMessage('missing_wrapper_connection', state);
return false;
}

if (!(await hasActiveWrapperWork(state))) {
await warnIfLivenessGuardOrphansAcceptedMessage('no_active_wrapper_work', state);
await clearCurrentWrapperRuntimeLivenessState(
storage,
state.wrapperGeneration,
Expand Down Expand Up @@ -1153,6 +1187,17 @@ export function createWrapperSupervisor(
await putWrapperLease(storage, stopping);
await requestAlarmAtOrBefore?.(stopping.attemptDeadlineAt);

logger
.withFields({
sessionId: getSessionIdForLogs(),
reason: stopping.reason,
target: stopping.target,
attemptId,
attempts: stopping.attempts,
requestedAt: stopping.requestedAt,
})
.info('Reconciling physical wrapper stop');

let result: StopWrappersResult;
try {
result = await stopWrappers({
Expand Down