Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 92 additions & 66 deletions services/session-ingest/src/queue-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ import {

const encoder = new TextEncoder();

function mockDbWithSessionGuard(overrides: Record<string, unknown> = {}) {
const limit = vi.fn(async () => [{ session_id: 'ses_exists' }]);
const where = vi.fn(() => ({ limit }));
const from = vi.fn(() => ({ where }));
const select = vi.fn(() => ({ from }));
return { select, ...overrides };
}

function createUpdateReturningMock(rows: unknown[] = []) {
const updateQuery = {
set: vi.fn(() => updateQuery),
where: vi.fn(() => updateQuery),
returning: vi.fn(async () => rows),
};
return {
update: vi.fn(() => updateQuery),
updateQuery,
};
}

function feedAll(extractor: ReturnType<typeof createItemExtractor>, json: string) {
extractor.tokenizer.write(encoder.encode(json));
extractor.tokenizer.end();
Expand Down Expand Up @@ -382,12 +402,8 @@ describe('queue', () => {
);
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);

const transaction = vi.fn(async () => null);
const limit = vi.fn(async () => [{ session_id: 'ses_split' }]);
const where = vi.fn(() => ({ limit }));
const from = vi.fn(() => ({ where }));
const select = vi.fn(() => ({ from }));
vi.mocked(getWorkerDb).mockReturnValue({ select, transaction } as never);
const { update } = createUpdateReturningMock();
vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ update }) as never);

const items = Array.from({ length: 129 }, (_, i) => ({
type: 'message',
Expand Down Expand Up @@ -445,7 +461,7 @@ describe('queue', () => {
1,
undefined
);
expect(transaction).toHaveBeenCalledTimes(1);
expect(update).toHaveBeenCalledTimes(1);
expect(deleteObject).toHaveBeenCalledWith('staging/split');
expect(ack).toHaveBeenCalledTimes(1);
expect(retry).not.toHaveBeenCalled();
Expand Down Expand Up @@ -563,12 +579,8 @@ describe('queue', () => {
const ingest = vi.fn(async () => ({ changes: [] }));
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);

const transaction = vi.fn(async () => null);
const limit = vi.fn(async () => [{ session_id: 'ses_malformed' }]);
const where = vi.fn(() => ({ limit }));
const from = vi.fn(() => ({ where }));
const select = vi.fn(() => ({ from }));
vi.mocked(getWorkerDb).mockReturnValue({ select, transaction } as never);
const { update } = createUpdateReturningMock();
vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ update }) as never);

const body = '{"data":[{"type":"message","data":{"id":"msg_1"}},broken';
const deleteObject = vi.fn(async () => undefined);
Expand Down Expand Up @@ -604,7 +616,7 @@ describe('queue', () => {
);

expect(ingest).not.toHaveBeenCalled();
expect(transaction).not.toHaveBeenCalled();
expect(update).not.toHaveBeenCalled();
expect(retry).toHaveBeenCalledWith({ delaySeconds: QUEUE_RETRY_DELAY_SECONDS });
expect(ack).not.toHaveBeenCalled();
expect(deleteObject).not.toHaveBeenCalled();
Expand All @@ -623,13 +635,9 @@ describe('queue', () => {
});
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);

// db.select powers the session-exists guard; db.transaction is the metadata flush.
const transaction = vi.fn(async () => null);
const limit = vi.fn(async () => [{ session_id: 'ses_partial' }]);
const where = vi.fn(() => ({ limit }));
const from = vi.fn(() => ({ where }));
const select = vi.fn(() => ({ from }));
vi.mocked(getWorkerDb).mockReturnValue({ select, transaction } as never);
// db.select powers the session-exists guard; db.update is the metadata flush.
const { update } = createUpdateReturningMock();
vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ update }) as never);

// 129 items -> chunk 1 = 128 items (flush succeeds), chunk 2 = 1 item (flush throws).
const items = Array.from({ length: 129 }, (_, i) => ({
Expand Down Expand Up @@ -673,7 +681,7 @@ describe('queue', () => {
const firstChunkItems = (ingest.mock.calls[0] as unknown[])[0];
expect(firstChunkItems).toHaveLength(128);
// Its metadata change was flushed to Postgres despite the later failure.
expect(transaction).toHaveBeenCalledTimes(1);
expect(update).toHaveBeenCalledTimes(1);
// The message is retried and the staging object is preserved for reprocessing.
expect(retry).toHaveBeenCalledWith({ delaySeconds: QUEUE_RETRY_DELAY_SECONDS });
expect(ack).not.toHaveBeenCalled();
Expand All @@ -682,36 +690,30 @@ describe('queue', () => {
});

describe('computeSessionMetadataUpdates', () => {
const fixedNow = () => '2026-05-05T00:00:00.000Z';

it('normalizes gitUrl to the canonical form before persisting', () => {
const updates = computeSessionMetadataUpdates(
new Map([['gitUrl', 'https://GitHub.com/ACME/Widgets.git']]),
fixedNow
new Map([['gitUrl', 'https://GitHub.com/ACME/Widgets.git']])
);
expect(updates.git_url).toBe('https://github.com/acme/widgets');
});

it('collapses scp-style and ssh:// URLs to the same normalized form as https', () => {
const fromScp = computeSessionMetadataUpdates(
new Map([['gitUrl', 'git@github.com:acme/widgets.git']]),
fixedNow
new Map([['gitUrl', 'git@github.com:acme/widgets.git']])
);
const fromSsh = computeSessionMetadataUpdates(
new Map([['gitUrl', 'ssh://git@github.com/acme/widgets.git']]),
fixedNow
new Map([['gitUrl', 'ssh://git@github.com/acme/widgets.git']])
);
const fromHttps = computeSessionMetadataUpdates(
new Map([['gitUrl', 'https://github.com/acme/widgets']]),
fixedNow
new Map([['gitUrl', 'https://github.com/acme/widgets']])
);
expect(fromScp.git_url).toBe('https://github.com/acme/widgets');
expect(fromSsh.git_url).toBe(fromScp.git_url);
expect(fromHttps.git_url).toBe(fromScp.git_url);
});

it('writes null git_url when the ingest cleared the field', () => {
const updates = computeSessionMetadataUpdates(new Map([['gitUrl', null]]), fixedNow);
const updates = computeSessionMetadataUpdates(new Map([['gitUrl', null]]));
expect(updates.git_url).toBeNull();
});

Expand All @@ -720,22 +722,21 @@ describe('computeSessionMetadataUpdates', () => {
new Map([
['gitBranch', 'feature/x'],
['title', 'hello'],
]),
fixedNow
])
);
expect('git_url' in updates).toBe(false);
expect(updates.git_branch).toBe('feature/x');
expect(updates.title).toBe('hello');
});

it('stamps status_updated_at when status changes', () => {
const updates = computeSessionMetadataUpdates(new Map([['status', 'running']]), fixedNow);
expect(updates.status).toBe('running');
expect(updates.status_updated_at).toBe('2026-05-05T00:00:00.000Z');
it('does not include status fields in the generic metadata update', () => {
const updates = computeSessionMetadataUpdates(new Map([['status', 'running']]));
expect('status' in updates).toBe(false);
expect('status_updated_at' in updates).toBe(false);
});

it('ignores a null "platform" change (creation value stays sticky)', () => {
const updates = computeSessionMetadataUpdates(new Map([['platform', null]]), fixedNow);
const updates = computeSessionMetadataUpdates(new Map([['platform', null]]));
expect('created_on_platform' in updates).toBe(false);
});
});
Expand All @@ -755,33 +756,10 @@ describe('queue status notifications', () => {
parent_session_id: null,
status: 'idle',
status_updated_at: '2026-05-05T00:00:01.000Z',
previous_status: 'busy',
};
const selectResults: unknown[][] = [
[{ session_id: persistedSession.session_id, status: 'idle' }],
[{ status: 'busy' }],
[persistedSession],
];
const selectResult = vi.fn(async () => selectResults.shift() ?? []);
const select = {
from: vi.fn(() => select),
where: vi.fn(() => select),
limit: vi.fn(() => select),
for: vi.fn(() => select),
then: vi.fn((resolve: (value: unknown) => unknown) => resolve(selectResult())),
};
const update = {
set: vi.fn(() => update),
where: vi.fn(() => update),
then: vi.fn((resolve: (value: undefined) => unknown) => resolve(undefined)),
};
const dbRef: Record<string, unknown> = {};
const db = {
select: vi.fn(() => select),
update: vi.fn(() => update),
transaction: vi.fn(async (fn: (tx: unknown) => Promise<unknown>) => fn(dbRef)),
} as unknown as ReturnType<typeof getWorkerDb>;
Object.assign(dbRef, db);
vi.mocked(getWorkerDb).mockReturnValue(db);
const execute = vi.fn(async () => ({ rows: [persistedSession] }));
vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ execute }) as never);
vi.mocked(getSessionIngestDO).mockReturnValue({
ingest: vi.fn(async () => ({ changes: [{ name: 'status', value: 'idle' }] })),
} as never);
Expand Down Expand Up @@ -825,6 +803,7 @@ describe('queue status notifications', () => {
await queue(batch, env, ctx);

expect(ack).toHaveBeenCalledTimes(1);
expect(execute).toHaveBeenCalledTimes(1);
expect(notifyUserSessionEvent).toHaveBeenCalledWith(
env,
'usr_test',
Expand All @@ -835,4 +814,51 @@ describe('queue status notifications', () => {
ctx
);
});

it('does not notify or stamp timestamps when the locked status is unchanged', async () => {
vi.mocked(notifyUserSessionEvent).mockClear();
const execute = vi.fn(async () => ({ rows: [] }));
vi.mocked(getWorkerDb).mockReturnValue(mockDbWithSessionGuard({ execute }) as never);
vi.mocked(getSessionIngestDO).mockReturnValue({
ingest: vi.fn(async () => ({ changes: [{ name: 'status', value: 'idle' }] })),
} as never);

const body = JSON.stringify({ data: [{ type: 'session_status', data: { status: 'idle' } }] });
const env = {
HYPERDRIVE: { connectionString: 'postgres://test' },
SESSION_INGEST_R2: {
get: vi.fn(async () => new Response(body)),
delete: vi.fn(async () => undefined),
put: vi.fn(async () => undefined),
},
} as never;
const ack = vi.fn();
const retry = vi.fn();
const ctx = { waitUntil: vi.fn() } as unknown as ExecutionContext;

await queue(
{
messages: [
{
body: {
r2Key: 'ingest/status-noop',
kiloUserId: 'usr_test',
sessionId: 'ses_12345678901234567890123456',
ingestVersion: 1,
ingestedAt: 1,
},
ack,
retry,
},
],
} as never,
env,
ctx
);

expect(ack).toHaveBeenCalledTimes(1);
expect(retry).not.toHaveBeenCalled();
expect(execute).toHaveBeenCalledTimes(1);
expect(notifyUserSessionEvent).not.toHaveBeenCalled();
});
});
Loading
Loading