Skip to content

Commit ceaaadd

Browse files
committed
feat: sort telemetry exports by timestamp
1 parent 2db4027 commit ceaaadd

4 files changed

Lines changed: 180 additions & 42 deletions

File tree

src/telemetry/export/files.ts

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
import {
1414
fileDateCanContainRangeEvent,
1515
isTimestampInRange,
16+
parseTelemetryTimestampMs,
1617
type TelemetryDateRange,
1718
} from "./range";
1819

@@ -25,7 +26,18 @@ interface TelemetryLogFile {
2526
readonly part: number;
2627
}
2728

28-
/** Log files that could contain events in `range`, in chronological order. */
29+
interface TelemetryEventEntry {
30+
readonly event: TelemetryEvent;
31+
readonly file: TelemetryLogFile;
32+
readonly lineNumber: number;
33+
}
34+
35+
interface EventCursor {
36+
readonly entry: TelemetryEventEntry;
37+
readonly iterator: AsyncIterator<TelemetryEventEntry>;
38+
}
39+
40+
/** Log files whose dates could overlap `range`. */
2941
export async function listTelemetryFilesForRange(
3042
telemetryDir: string,
3143
range: TelemetryDateRange,
@@ -41,7 +53,7 @@ export async function listTelemetryFilesForRange(
4153
}
4254

4355
return names
44-
.map((name) => parseLogFilename(telemetryDir, name))
56+
.map((name) => parseLogFilePath(path.join(telemetryDir, name)))
4557
.filter(
4658
(file): file is TelemetryLogFile =>
4759
file !== undefined && fileDateCanContainRangeEvent(file.date, range),
@@ -50,17 +62,44 @@ export async function listTelemetryFilesForRange(
5062
.map(({ path: filePath }) => filePath);
5163
}
5264

53-
/**
54-
* Yields events from `filePaths` in order, keeping only those whose timestamp
55-
* falls inside `range`. Reads line-by-line so memory stays flat on big files.
56-
*/
57-
export async function* streamTelemetryEvents(
65+
/** Merge per-session append streams by timestamp, buffering one event per session. */
66+
export async function* streamTelemetryEventsSorted(
5867
filePaths: readonly string[],
5968
range: TelemetryDateRange,
6069
): AsyncIterable<TelemetryEvent> {
61-
for (const filePath of filePaths) {
62-
const name = path.basename(filePath);
63-
const stream = createReadStream(filePath, { encoding: "utf8" });
70+
const frontier: EventCursor[] = [];
71+
for (const files of groupLogFilesBySession(filePaths)) {
72+
const iterator = streamTelemetryEventEntries(files, range)[
73+
Symbol.asyncIterator
74+
]();
75+
const next = await iterator.next();
76+
if (!next.done) {
77+
frontier.push({ entry: next.value, iterator });
78+
}
79+
}
80+
81+
while (frontier.length > 0) {
82+
frontier.sort((a, b) => compareEventEntries(a.entry, b.entry));
83+
const cursor = frontier.shift();
84+
if (!cursor) {
85+
return;
86+
}
87+
yield cursor.entry.event;
88+
89+
const next = await cursor.iterator.next();
90+
if (!next.done) {
91+
frontier.push({ entry: next.value, iterator: cursor.iterator });
92+
}
93+
}
94+
}
95+
96+
async function* streamTelemetryEventEntries(
97+
files: readonly TelemetryLogFile[],
98+
range: TelemetryDateRange,
99+
): AsyncIterable<TelemetryEventEntry> {
100+
for (const file of files) {
101+
const name = path.basename(file.path);
102+
const stream = createReadStream(file.path, { encoding: "utf8" });
64103
const lines = readline.createInterface({
65104
input: stream,
66105
crlfDelay: Infinity,
@@ -74,7 +113,7 @@ export async function* streamTelemetryEvents(
74113
}
75114
const event = parseTelemetryEventLine(line, name, lineNumber);
76115
if (isTimestampInRange(event.timestamp, range)) {
77-
yield event;
116+
yield { event, file, lineNumber };
78117
}
79118
}
80119
} catch (err) {
@@ -96,15 +135,31 @@ export async function* streamTelemetryEvents(
96135
}
97136
}
98137

99-
function parseLogFilename(
100-
dir: string,
101-
name: string,
102-
): TelemetryLogFile | undefined {
103-
const parsed = localJsonlFiles.parseFileName(name);
104-
if (!parsed) {
105-
return undefined;
138+
function groupLogFilesBySession(
139+
filePaths: readonly string[],
140+
): TelemetryLogFile[][] {
141+
const groups = new Map<string, TelemetryLogFile[]>();
142+
for (const file of parseLogFilePaths(filePaths).sort(compareLogFiles)) {
143+
const group = groups.get(file.session);
144+
if (group) {
145+
group.push(file);
146+
} else {
147+
groups.set(file.session, [file]);
148+
}
106149
}
107-
return { path: path.join(dir, name), ...parsed };
150+
return [...groups.values()];
151+
}
152+
153+
function parseLogFilePaths(filePaths: readonly string[]): TelemetryLogFile[] {
154+
return filePaths.flatMap((filePath) => {
155+
const file = parseLogFilePath(filePath);
156+
return file ? [file] : [];
157+
});
158+
}
159+
160+
function parseLogFilePath(filePath: string): TelemetryLogFile | undefined {
161+
const parsed = localJsonlFiles.parseFileName(path.basename(filePath));
162+
return parsed ? { path: filePath, ...parsed } : undefined;
108163
}
109164

110165
function compareLogFiles(a: TelemetryLogFile, b: TelemetryLogFile): number {
@@ -114,3 +169,20 @@ function compareLogFiles(a: TelemetryLogFile, b: TelemetryLogFile): number {
114169
a.part - b.part
115170
);
116171
}
172+
173+
function compareEventEntries(
174+
a: TelemetryEventEntry,
175+
b: TelemetryEventEntry,
176+
): number {
177+
const timestamp =
178+
parseTelemetryTimestampMs(a.event.timestamp) -
179+
parseTelemetryTimestampMs(b.event.timestamp);
180+
return (
181+
timestamp ||
182+
a.event.context.sessionId.localeCompare(b.event.context.sessionId) ||
183+
a.file.session.localeCompare(b.file.session) ||
184+
a.file.date.localeCompare(b.file.date) ||
185+
a.file.part - b.file.part ||
186+
a.lineNumber - b.lineNumber
187+
);
188+
}

src/telemetry/export/pipeline.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import * as fs from "node:fs/promises";
22

33
import { throwIfAborted } from "../../error/errorUtils";
44

5-
import { listTelemetryFilesForRange, streamTelemetryEvents } from "./files";
5+
import {
6+
listTelemetryFilesForRange,
7+
streamTelemetryEventsSorted,
8+
} from "./files";
69

710
import type { TelemetryEvent } from "../event";
811
import type { FlushStatus } from "../service";
@@ -58,7 +61,7 @@ export async function collectTelemetryExport(
5861

5962
runtime.report("Writing export...");
6063
const events = abortable(
61-
streamTelemetryEvents(filePaths, request.range),
64+
streamTelemetryEventsSorted(filePaths, request.range),
6265
runtime.signal,
6366
);
6467
const eventCount = await request.writer(

test/unit/telemetry/export/files.test.ts

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
44

55
import {
66
listTelemetryFilesForRange,
7-
streamTelemetryEvents,
7+
streamTelemetryEventsSorted,
88
} from "@/telemetry/export/files";
99
import { createCustomDateRange } from "@/telemetry/export/range";
1010
import { serializeTelemetryEventLine } from "@/telemetry/wireFormat";
@@ -61,36 +61,83 @@ describe("listTelemetryFilesForRange", () => {
6161
});
6262
});
6363

64-
describe("streamTelemetryEvents", () => {
65-
it("yields only events whose timestamp falls inside the range", async () => {
64+
describe("streamTelemetryEventsSorted", () => {
65+
it("interleaves same-day sessions by event timestamp instead of filename session", async () => {
66+
writeFiles({
67+
"telemetry-2026-05-12-bbbbbbbb.jsonl":
68+
serializeTelemetryEventLine(
69+
makeSessionEvent("session-b", 0, "2026-05-12T10:00:00.000Z"),
70+
) +
71+
serializeTelemetryEventLine(
72+
makeSessionEvent("session-b", 1, "2026-05-12T10:02:00.000Z"),
73+
),
74+
"telemetry-2026-05-12-aaaaaaaa.jsonl": serializeTelemetryEventLine(
75+
makeSessionEvent("session-a", 0, "2026-05-12T10:01:00.000Z"),
76+
),
77+
});
78+
79+
const events = await collectSorted([
80+
"telemetry-2026-05-12-bbbbbbbb.jsonl",
81+
"telemetry-2026-05-12-aaaaaaaa.jsonl",
82+
]);
83+
84+
expect(events.map((event) => event.timestamp)).toEqual([
85+
"2026-05-12T10:00:00.000Z",
86+
"2026-05-12T10:01:00.000Z",
87+
"2026-05-12T10:02:00.000Z",
88+
]);
89+
});
90+
91+
it("does not compare event sequences across sessions", async () => {
92+
writeFiles({
93+
"telemetry-2026-05-12-bbbbbbbb.jsonl": serializeTelemetryEventLine(
94+
makeSessionEvent("session-b", 0, "2026-05-12T10:00:00.000Z"),
95+
),
96+
"telemetry-2026-05-12-aaaaaaaa.jsonl": serializeTelemetryEventLine(
97+
makeSessionEvent("session-a", 10, "2026-05-12T10:00:00.000Z"),
98+
),
99+
});
100+
101+
const events = await collectSorted([
102+
"telemetry-2026-05-12-bbbbbbbb.jsonl",
103+
"telemetry-2026-05-12-aaaaaaaa.jsonl",
104+
]);
105+
106+
expect(
107+
events.map((event) => [event.context.sessionId, event.eventSequence]),
108+
).toEqual([
109+
["session-a", 10],
110+
["session-b", 0],
111+
]);
112+
});
113+
114+
it("preserves events when a session timestamp moves backward", async () => {
66115
writeFiles({
67116
"telemetry-2026-05-12-aaaaaaaa.jsonl":
68117
serializeTelemetryEventLine(
69-
makeEvent({ timestamp: "2026-05-11T23:59:59.999Z" }),
118+
makeSessionEvent("session-a", 0, "2026-05-12T10:02:00.000Z"),
70119
) +
71120
serializeTelemetryEventLine(
72-
makeEvent({ timestamp: "2026-05-12T00:00:00.000Z" }),
121+
makeSessionEvent("session-a", 1, "2026-05-12T10:00:00.000Z"),
73122
),
74123
});
75124

76-
const events: TelemetryEvent[] = [];
77-
for await (const event of streamTelemetryEvents(
78-
[`${DIR}/telemetry-2026-05-12-aaaaaaaa.jsonl`],
79-
createCustomDateRange("2026-05-12", "2026-05-12"),
80-
)) {
81-
events.push(event);
82-
}
125+
const events = await collectSorted(["telemetry-2026-05-12-aaaaaaaa.jsonl"]);
83126

84-
expect(events).toHaveLength(1);
85-
expect(events[0].timestamp).toBe("2026-05-12T00:00:00.000Z");
127+
expect(events.map((event) => event.timestamp)).toEqual([
128+
"2026-05-12T10:02:00.000Z",
129+
"2026-05-12T10:00:00.000Z",
130+
]);
86131
});
87132

88133
it("surfaces parse errors with file:line context", async () => {
89134
writeFiles({
90135
"telemetry-2026-05-12-aaaaaaaa.jsonl": "{not-json}\n",
91136
});
92137

93-
await expect(drain("telemetry-2026-05-12-aaaaaaaa.jsonl")).rejects.toThrow(
138+
await expect(
139+
collectSorted(["telemetry-2026-05-12-aaaaaaaa.jsonl"]),
140+
).rejects.toThrow(
94141
/Failed to parse telemetry file telemetry-2026-05-12-aaaaaaaa\.jsonl:1/,
95142
);
96143
});
@@ -102,11 +149,27 @@ function writeFiles(files: Record<string, string>): void {
102149
}
103150
}
104151

105-
async function drain(name: string): Promise<void> {
106-
for await (const _ of streamTelemetryEvents(
107-
[`${DIR}/${name}`],
152+
function makeSessionEvent(
153+
sessionId: string,
154+
eventSequence: number,
155+
timestamp: string,
156+
): TelemetryEvent {
157+
const event = makeEvent({ timestamp, eventSequence });
158+
return {
159+
...event,
160+
context: { ...event.context, sessionId },
161+
};
162+
}
163+
164+
async function collectSorted(
165+
names: readonly string[],
166+
): Promise<TelemetryEvent[]> {
167+
const events: TelemetryEvent[] = [];
168+
for await (const event of streamTelemetryEventsSorted(
169+
names.map((name) => `${DIR}/${name}`),
108170
createCustomDateRange("2026-05-12", "2026-05-12"),
109171
)) {
110-
// Pull the iterator to surface parse errors.
172+
events.push(event);
111173
}
174+
return events;
112175
}

test/unit/telemetry/export/pipeline.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import type { FlushStatus } from "@/telemetry/service";
1818

1919
vi.mock("@/telemetry/export/files", () => ({
2020
listTelemetryFilesForRange: vi.fn(),
21-
streamTelemetryEvents: vi.fn(),
21+
streamTelemetryEventsSorted: vi.fn(),
2222
}));
2323
vi.mock("node:fs/promises", () => ({ rm: vi.fn(() => Promise.resolve()) }));
2424

@@ -45,7 +45,7 @@ function setup(
4545
vi.mocked(files.listTelemetryFilesForRange).mockResolvedValue([
4646
...(opts.filePaths ?? FILE_PATHS),
4747
]);
48-
vi.mocked(files.streamTelemetryEvents).mockReturnValue(
48+
vi.mocked(files.streamTelemetryEventsSorted).mockReturnValue(
4949
asyncIterable(opts.events ?? [makeEvent()]),
5050
);
5151

0 commit comments

Comments
 (0)