diff --git a/packages/browser-core/src/browser/fetchObservable.ts b/packages/browser-core/src/browser/fetchObservable.ts index 79efde55da..1789b71252 100644 --- a/packages/browser-core/src/browser/fetchObservable.ts +++ b/packages/browser-core/src/browser/fetchObservable.ts @@ -1,5 +1,5 @@ -import type { ClocksState } from '@datadog/js-core/time' -import { clocksNow } from '@datadog/js-core/time' +import type { ClocksState, Duration } from '@datadog/js-core/time' +import { clocksNow, elapsed, toServerDuration } from '@datadog/js-core/time' import type { InstrumentedMethodCall } from '../tools/instrumentMethod' import { instrumentMethod } from '../tools/instrumentMethod' import { monitorError } from '../tools/monitor' @@ -8,7 +8,12 @@ import { normalizeUrl } from '../tools/utils/urlPolyfill' import type { GlobalObject } from '../tools/globalObject' import { globalObject } from '../tools/globalObject' import { readBytesFromStream } from '../tools/readBytesFromStream' +import { noop } from '../tools/utils/functionUtils' import { tryToClone } from '../tools/utils/responseUtils' +import type { TimeoutId } from '../tools/timer' +import { setTimeout, clearTimeout } from '../tools/timer' +import type { SseMetadata, SseTrackingEndReason } from '../tools/sse' +import { createSseEventCounter, isSseContentType, SSE_BYTE_LIMIT, SSE_TIME_LIMIT } from '../tools/sse' interface FetchContextBase { method: string @@ -38,6 +43,11 @@ export type FetchContext = FetchStartContext | FetchResolveContext type ResponseBodyActionGetter = (context: FetchResolveContext) => ResponseBodyAction +// Whether the SSE tap should run for this resolved fetch. Injected by RUM so the gate lives there. +type SseActionGetter = (context: FetchResolveContext) => boolean +// Receives SSE counts once the stream ends, delivered out of band from the `resolve` notification. +type SseMetadataListener = (context: FetchResolveContext, sseMetadata: SseMetadata) => void + /** * Action to take with the response body of a fetch request. * Values are ordered by priority: higher values take precedence when multiple actions are requested. @@ -49,11 +59,27 @@ export const enum ResponseBodyAction { let fetchObservable: Observable | undefined const responseBodyActionGetters: ResponseBodyActionGetter[] = [] +const sseActionGetters: SseActionGetter[] = [] +const sseMetadataListeners: SseMetadataListener[] = [] -export function initFetchObservable({ responseBodyAction }: { responseBodyAction?: ResponseBodyActionGetter } = {}) { +export function initFetchObservable({ + responseBodyAction, + collectSse, + onSseMetadata, +}: { + responseBodyAction?: ResponseBodyActionGetter + collectSse?: SseActionGetter + onSseMetadata?: SseMetadataListener +} = {}) { if (responseBodyAction) { responseBodyActionGetters.push(responseBodyAction) } + if (collectSse) { + sseActionGetters.push(collectSse) + } + if (onSseMetadata) { + sseMetadataListeners.push(onSseMetadata) + } if (!fetchObservable) { fetchObservable = createFetchObservable() } @@ -63,6 +89,8 @@ export function initFetchObservable({ responseBodyAction }: { responseBodyAction export function resetFetchObservable() { fetchObservable = undefined responseBodyActionGetters.length = 0 + sseActionGetters.length = 0 + sseMetadataListeners.length = 0 } function createFetchObservable() { @@ -161,7 +189,88 @@ async function afterSend( // This is not critical and should not be reported as an SDK error } } + } else if ( + sseActionGetters.some((getter) => getter(context)) && + isSseContentType(response.headers.get('content-type')) + ) { + // Read incrementally in the background so resolve (and page-activity) is not held for the + // stream's lifetime. Counts are delivered to listeners once the stream ends. + collectSseMetadata(context, response).then((sseMetadata) => { + if (sseMetadata) { + sseMetadataListeners.forEach((listener) => listener(context, sseMetadata)) + } + }, monitorError) } observable.notify({ ...context, state: 'resolve' }) } + +async function collectSseMetadata(context: FetchResolveContext, response: Response): Promise { + const clonedResponse = tryToClone(response) + if (!clonedResponse?.body) { + return + } + + const reader = clonedResponse.body.getReader() + const decoder = new TextDecoder('utf-8') + const counter = createSseEventCounter() + + let bytes = 0 + let lastEventAt: Duration | undefined + let trackingEndReason: SseTrackingEndReason = 'stream_closed' + + // Cancelling the clone does not disturb the app's own response branch. + let timedOut = false + const timeoutId: TimeoutId = setTimeout(() => { + timedOut = true + reader.cancel().catch(noop) + }, SSE_TIME_LIMIT) + + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } + if (!value) { + continue + } + + bytes += value.byteLength + + // Stop before decoding a chunk that crosses the cap, so a single oversized chunk cannot grow + // the decoded string or the parser's line buffer past the byte budget. + if (bytes >= SSE_BYTE_LIMIT) { + trackingEndReason = 'byte_cap' + reader.cancel().catch(noop) + break + } + + const text = decoder.decode(value, { stream: true }) + if (text) { + counter.push(text) + lastEventAt = elapsed(context.startClocks.relative, clocksNow().relative) + } + } + if (timedOut) { + trackingEndReason = 'time_cap' + } + } catch (error) { + // Stream read errors (abort, network) are not SDK errors; preserve counts gathered so far. + if (timedOut) { + trackingEndReason = 'time_cap' + } else if (context.init?.signal?.aborted || (error instanceof DOMException && error.name === 'AbortError')) { + trackingEndReason = 'aborted' + } else { + trackingEndReason = 'error' + } + } finally { + clearTimeout(timeoutId) + } + + return counter.finalize({ + lastEventAt: toServerDuration(lastEventAt), + endTime: toServerDuration(elapsed(context.startClocks.relative, clocksNow().relative)), + trackingEndReason, + }) +} diff --git a/packages/browser-core/src/index.ts b/packages/browser-core/src/index.ts index 4f6ca5fce8..c49918154b 100644 --- a/packages/browser-core/src/index.ts +++ b/packages/browser-core/src/index.ts @@ -151,6 +151,10 @@ export { CustomerDataType, CustomerContextKey, ContextManagerMethod } from './do export type { ValueHistory, ValueHistoryEntry } from './tools/valueHistory' export { createValueHistory, CLEAR_OLD_VALUES_INTERVAL } from './tools/valueHistory' export { readBytesFromStream } from './tools/readBytesFromStream' +// Only the SSE wire-metadata types are part of the public surface: rum-core consumes `SseMetadata` +// (which references the other two) on the resource event. The counter, content-type helper, and +// limit constants stay package-internal — fetchObservable imports them directly from ./tools/sse. +export type { SseMetadata, SseEventEntry, SseTrackingEndReason } from './tools/sse' export type { SessionState } from './domain/session/sessionState' export { SESSION_STORE_KEY } from './domain/session/storeStrategies/sessionStoreStrategy' export type { MemorySession } from './domain/session/storeStrategies/sessionInMemory' diff --git a/packages/browser-core/src/tools/experimentalFeatures.ts b/packages/browser-core/src/tools/experimentalFeatures.ts index 65c39b5ffa..ad62633268 100644 --- a/packages/browser-core/src/tools/experimentalFeatures.ts +++ b/packages/browser-core/src/tools/experimentalFeatures.ts @@ -16,6 +16,7 @@ import { objectHasValue } from './utils/objectUtils' export enum ExperimentalFeature { TRACK_INTAKE_REQUESTS = 'track_intake_requests', PARTIAL_VIEW_UPDATES = 'partial_view_updates', + SSE_EVENT_COUNTS = 'sse_event_counts', } const enabledExperimentalFeatures: Set = new Set() diff --git a/packages/browser-core/src/tools/sse.spec.ts b/packages/browser-core/src/tools/sse.spec.ts new file mode 100644 index 0000000000..35e1ec20b4 --- /dev/null +++ b/packages/browser-core/src/tools/sse.spec.ts @@ -0,0 +1,214 @@ +import type { ServerDuration } from '@datadog/js-core/time' +import { + createSseEventCounter, + isSseContentType, + MAX_SSE_EVENT_NAMES, + MAX_SSE_EVENT_NAME_LENGTH, + SSE_OTHER_EVENT_NAME, +} from './sse' +import type { SseFinalizeInfo } from './sse' + +const STREAM_CLOSED: SseFinalizeInfo = { trackingEndReason: 'stream_closed' } + +function countsByName(events: Array<{ name: string; count: number }> | undefined): Record { + const result: Record = {} + events?.forEach(({ name, count }) => { + result[name] = count + }) + return result +} + +describe('isSseContentType', () => { + it('is true for text/event-stream', () => { + expect(isSseContentType('text/event-stream')).toBe(true) + }) + + it('is true ignoring a charset suffix', () => { + expect(isSseContentType('text/event-stream; charset=utf-8')).toBe(true) + }) + + it('is true ignoring leading whitespace and case', () => { + expect(isSseContentType(' Text/Event-Stream')).toBe(true) + }) + + it('is false for other content types', () => { + expect(isSseContentType('application/json')).toBe(false) + }) + + it('is false for empty / nullish values', () => { + expect(isSseContentType('')).toBe(false) + expect(isSseContentType(null)).toBe(false) + expect(isSseContentType(undefined)).toBe(false) + }) +}) + +describe('createSseEventCounter', () => { + it('counts mixed named and unnamed frames', () => { + const counter = createSseEventCounter() + counter.push('event: tokenDelta\ndata: {}\n\ndata: {}\n\nevent: done\ndata: [DONE]\n\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(3) + expect(countsByName(metadata.events)).toEqual({ tokenDelta: 1, message: 1, done: 1 }) + }) + + it('buckets frames with no event: field under message', () => { + const counter = createSseEventCounter() + counter.push('data: a\n\ndata: b\n\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(countsByName(metadata.events)).toEqual({ message: 2 }) + }) + + it('counts a frame split across two push() calls exactly once', () => { + const counter = createSseEventCounter() + counter.push('event: tok') + counter.push('enDelta\ndata: {}\n') + counter.push('\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(1) + expect(countsByName(metadata.events)).toEqual({ tokenDelta: 1 }) + }) + + it('counts multiple data: lines in one frame as a single event', () => { + const counter = createSseEventCounter() + counter.push('event: msg\ndata: line1\ndata: line2\ndata: line3\n\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(1) + }) + + it('counts comment lines separately from events', () => { + const counter = createSseEventCounter() + counter.push(': keepalive\nevent: a\ndata: x\n\n: another comment\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(1) + expect(metadata.comment_count).toBe(2) + }) + + it('does not count an incomplete trailing frame', () => { + const counter = createSseEventCounter() + counter.push('event: a\ndata: x\n\nevent: b\ndata: y') + const metadata = counter.finalize({ ...STREAM_CLOSED, trackingEndReason: 'aborted' })! + + expect(metadata.event_count).toBe(1) + expect(countsByName(metadata.events)).toEqual({ a: 1 }) + }) + + it('returns undefined when no events were seen', () => { + const counter = createSseEventCounter() + counter.push(': only a comment\n\n') + expect(counter.finalize(STREAM_CLOSED)).toBeUndefined() + }) + + it('parses CRLF line endings identically to LF', () => { + const counter = createSseEventCounter() + counter.push('event: a\r\ndata: x\r\n\r\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(1) + expect(countsByName(metadata.events)).toEqual({ a: 1 }) + }) + + it('parses bare CR line endings identically to LF', () => { + const counter = createSseEventCounter() + // Bare CRs split the fields and the terminating blank line; the final LF resolves the held CR. + counter.push('event: a\rdata: x\r\r') + counter.push('\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(1) + expect(countsByName(metadata.events)).toEqual({ a: 1 }) + }) + + it('joins a CRLF split across two push() calls without dispatching a spurious frame', () => { + const counter = createSseEventCounter() + counter.push('event: a\rdata: x\r') // trailing CR is held back + counter.push('\n\r\n') // completes the CRLF, then a blank line ends the frame + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.event_count).toBe(1) + expect(countsByName(metadata.events)).toEqual({ a: 1 }) + }) + + it('folds a customer event literally named _other_ into the overflow bucket without double-counting', () => { + const counter = createSseEventCounter() + counter.push(`event: ${SSE_OTHER_EVENT_NAME}\ndata: x\n\n`) // a real event named _other_, seen early + for (let i = 0; i < MAX_SSE_EVENT_NAMES + 2; i++) { + counter.push(`event: e${i}\ndata: x\n\n`) // fills the cap, then 2 overflow into _other_ + } + const metadata = counter.finalize(STREAM_CLOSED)! + + // _other_ never occupied a tracked slot, so it appears once with the real + overflow total (1 + 2). + expect(countsByName(metadata.events)[SSE_OTHER_EVENT_NAME]).toBe(3) + expect(metadata.events.filter((entry) => entry.name === SSE_OTHER_EVENT_NAME).length).toBe(1) + expect(metadata.event_count).toBe(MAX_SSE_EVENT_NAMES + 3) + }) + + it('truncates a very long last_event_id', () => { + const counter = createSseEventCounter() + const longId = 'a'.repeat(MAX_SSE_EVENT_NAME_LENGTH + 50) + counter.push(`id: ${longId}\ndata: x\n\n`) + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.last_event_id!.length).toBe(MAX_SSE_EVENT_NAME_LENGTH) + }) + + it('tracks exactly 20 distinct names individually with no _other_', () => { + const counter = createSseEventCounter() + for (let i = 0; i < MAX_SSE_EVENT_NAMES; i++) { + counter.push(`event: e${i}\ndata: x\n\n`) + } + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.events.length).toBe(MAX_SSE_EVENT_NAMES) + expect(metadata.events.some((entry) => entry.name === SSE_OTHER_EVENT_NAME)).toBe(false) + expect(metadata.event_count).toBe(MAX_SSE_EVENT_NAMES) + }) + + it('routes names beyond the cap into _other_ while keeping the grand total', () => { + const counter = createSseEventCounter() + for (let i = 0; i < MAX_SSE_EVENT_NAMES + 5; i++) { + counter.push(`event: e${i}\ndata: x\n\n`) + } + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.events.length).toBe(MAX_SSE_EVENT_NAMES + 1) + expect(countsByName(metadata.events)[SSE_OTHER_EVENT_NAME]).toBe(5) + expect(metadata.event_count).toBe(MAX_SSE_EVENT_NAMES + 5) + }) + + it('truncates very long event names', () => { + const counter = createSseEventCounter() + const longName = 'x'.repeat(MAX_SSE_EVENT_NAME_LENGTH + 50) + counter.push(`event: ${longName}\ndata: y\n\n`) + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.events[0].name.length).toBe(MAX_SSE_EVENT_NAME_LENGTH) + }) + + it('captures last_event_id and retry_hint', () => { + const counter = createSseEventCounter() + counter.push('id: 1\nretry: 3000\ndata: x\n\nid: 2\ndata: y\n\n') + const metadata = counter.finalize(STREAM_CLOSED)! + + expect(metadata.last_event_id).toBe('2') + expect(metadata.retry_hint).toBe(3000) + }) + + it('merges caller-provided timing and end reason into finalize', () => { + const counter = createSseEventCounter() + counter.push('data: x\n\n') + const metadata = counter.finalize({ + lastEventAt: 20 as ServerDuration, + endTime: 30 as ServerDuration, + trackingEndReason: 'byte_cap', + })! + + expect(metadata.last_event_at).toBe(20 as ServerDuration) + expect(metadata.end_time).toBe(30 as ServerDuration) + expect(metadata.tracking_end_reason).toBe('byte_cap') + }) +}) diff --git a/packages/browser-core/src/tools/sse.ts b/packages/browser-core/src/tools/sse.ts new file mode 100644 index 0000000000..03e8d9e194 --- /dev/null +++ b/packages/browser-core/src/tools/sse.ts @@ -0,0 +1,124 @@ +import type { ServerDuration } from '@datadog/js-core/time' +import { ONE_SECOND } from '@datadog/js-core/time' +import { safeTruncate } from './utils/stringUtils' +import { ONE_MEBI_BYTE } from './utils/byteUtils' +import { createSseFrameParser } from './sseParser' + +// Distinct event names tracked per resource; names beyond the cap accumulate into `_other_`. +export const MAX_SSE_EVENT_NAMES = 20 +export const MAX_SSE_EVENT_NAME_LENGTH = 64 + +// Reserved overflow bucket. A customer event literally named `_other_` is folded in (see `record`). +export const SSE_OTHER_EVENT_NAME = '_other_' + +// Caps bounding the incremental read for long-lived or never-closing streams. +export const SSE_BYTE_LIMIT = ONE_MEBI_BYTE +export const SSE_TIME_LIMIT = 30 * ONE_SECOND + +export type SseTrackingEndReason = 'stream_closed' | 'byte_cap' | 'time_cap' | 'error' | 'aborted' + +export interface SseEventEntry { + name: string + count: number +} + +export interface SseMetadata { + event_count: number + events: SseEventEntry[] + comment_count: number + last_event_id?: string + retry_hint?: number + last_event_at?: ServerDuration + end_time?: ServerDuration + tracking_end_reason: SseTrackingEndReason +} + +// Read-loop info merged in at finalize. Internal (never serialized), so camelCase; `finalize` maps +// it onto the snake_case `SseMetadata` wire shape. +export interface SseFinalizeInfo { + lastEventAt?: ServerDuration + endTime?: ServerDuration + trackingEndReason: SseTrackingEndReason +} + +export interface SseEventCounter { + push: (textChunk: string) => void + finalize: (info: SseFinalizeInfo) => SseMetadata | undefined +} + +export function isSseContentType(contentType: string | null | undefined): boolean { + return !!contentType && contentType.trim().toLowerCase().startsWith('text/event-stream') +} + +// Tallies parsed SSE frames by name, capping distinct names and length so the metadata stays +// bounded. Delegates parsing to `createSseFrameParser`; the `data:` payload is never retained. +export function createSseEventCounter(): SseEventCounter { + let eventCount = 0 + let commentCount = 0 + const counts = new Map() + let otherCount = 0 + let lastEventId: string | undefined + let retryHint: number | undefined + + function record(name: string) { + // Keep the reserved name out of `counts` so a real `_other_` event can't collide with overflow. + if (name === SSE_OTHER_EVENT_NAME) { + otherCount++ + return + } + const existing = counts.get(name) + if (existing !== undefined) { + counts.set(name, existing + 1) + } else if (counts.size < MAX_SSE_EVENT_NAMES) { + counts.set(name, 1) + } else { + otherCount++ + } + } + + const parser = createSseFrameParser({ + onEvent: (type) => { + eventCount++ + record(safeTruncate(type, MAX_SSE_EVENT_NAME_LENGTH)) + }, + onComment: () => { + commentCount++ + }, + onId: (value) => { + lastEventId = safeTruncate(value, MAX_SSE_EVENT_NAME_LENGTH) + }, + onRetry: (value) => { + retryHint = value + }, + }) + + return { + push: parser.push, + + finalize(info: SseFinalizeInfo): SseMetadata | undefined { + // An incomplete trailing frame (no terminating blank line) is intentionally not counted. + if (eventCount === 0) { + return undefined + } + + const events: SseEventEntry[] = [] + counts.forEach((count, name) => { + events.push({ name, count }) + }) + if (otherCount > 0) { + events.push({ name: SSE_OTHER_EVENT_NAME, count: otherCount }) + } + + return { + event_count: eventCount, + events, + comment_count: commentCount, + last_event_id: lastEventId, + retry_hint: retryHint, + last_event_at: info.lastEventAt, + end_time: info.endTime, + tracking_end_reason: info.trackingEndReason, + } + }, + } +} diff --git a/packages/browser-core/src/tools/sseParser.spec.ts b/packages/browser-core/src/tools/sseParser.spec.ts new file mode 100644 index 0000000000..a3a527dde5 --- /dev/null +++ b/packages/browser-core/src/tools/sseParser.spec.ts @@ -0,0 +1,87 @@ +import { createSseFrameParser } from './sseParser' +import type { SseFrameHandlers } from './sseParser' + +function trackingHandlers() { + const events: string[] = [] + const ids: string[] = [] + const retries: number[] = [] + let comments = 0 + const handlers: SseFrameHandlers = { + onEvent: (type) => events.push(type), + onComment: () => { + comments++ + }, + onId: (value) => ids.push(value), + onRetry: (value) => retries.push(value), + } + return { + handlers, + events, + ids, + retries, + get comments() { + return comments + }, + } +} + +describe('createSseFrameParser', () => { + it('dispatches a named frame on the terminating blank line', () => { + const tracker = trackingHandlers() + const parser = createSseFrameParser(tracker.handlers) + + parser.push('event: tokenDelta\ndata: {}\n\n') + + expect(tracker.events).toEqual(['tokenDelta']) + }) + + it('defaults the event type to message when no event: field is present', () => { + const tracker = trackingHandlers() + const parser = createSseFrameParser(tracker.handlers) + + parser.push('data: a\n\ndata: b\n\n') + + expect(tracker.events).toEqual(['message', 'message']) + }) + + it('does not dispatch a frame that carried no data: line', () => { + const tracker = trackingHandlers() + const parser = createSseFrameParser(tracker.handlers) + + parser.push('event: ping\n\n') + + expect(tracker.events).toEqual([]) + }) + + it('joins a frame split across push() calls and dispatches it once', () => { + const tracker = trackingHandlers() + const parser = createSseFrameParser(tracker.handlers) + + parser.push('event: tok') + parser.push('enDelta\ndata: {}\n') + parser.push('\n') + + expect(tracker.events).toEqual(['tokenDelta']) + }) + + it('reports comments and the latest id / retry without dispatching frames for them', () => { + const tracker = trackingHandlers() + const parser = createSseFrameParser(tracker.handlers) + + parser.push(': keepalive\nid: 1\nretry: 3000\ndata: x\n\nid: 2\ndata: y\n\n') + + expect(tracker.events).toEqual(['message', 'message']) + expect(tracker.comments).toBe(1) + expect(tracker.ids).toEqual(['1', '2']) + expect(tracker.retries).toEqual([3000]) + }) + + it('ignores a non-numeric retry value', () => { + const tracker = trackingHandlers() + const parser = createSseFrameParser(tracker.handlers) + + parser.push('retry: soon\ndata: x\n\n') + + expect(tracker.retries).toEqual([]) + }) +}) diff --git a/packages/browser-core/src/tools/sseParser.ts b/packages/browser-core/src/tools/sseParser.ts new file mode 100644 index 0000000000..f2ab428435 --- /dev/null +++ b/packages/browser-core/src/tools/sseParser.ts @@ -0,0 +1,107 @@ +// Callbacks the SSE frame parser invokes as it consumes decoded text. The parser implements the +// WHATWG event-stream line/frame model only; consumers decide how to aggregate. +export interface SseFrameHandlers { + // A dispatched event frame (blank line after >= 1 `data:` line); `type` is `event:` or `message`. + onEvent: (type: string) => void + onComment: () => void + onId: (value: string) => void + onRetry: (value: number) => void +} + +export interface SseFrameParser { + push: (textChunk: string) => void +} + +// Stateful, allocation-bounded SSE frame parser. Buffers only a trailing partial line, splits on +// LF / CRLF / bare CR, parses fields and comments, and dispatches frames on blank lines. +export function createSseFrameParser(handlers: SseFrameHandlers): SseFrameParser { + let lineBuffer = '' + let currentEventType: string | undefined + let frameHasData = false + + function dispatchFrame() { + if (frameHasData) { + handlers.onEvent(currentEventType !== undefined ? currentEventType : 'message') + } + currentEventType = undefined + frameHasData = false + } + + function processLine(line: string) { + if (line === '') { + dispatchFrame() + return + } + if (line.charCodeAt(0) === 58 /* ':' */) { + handlers.onComment() + return + } + + const colonIndex = line.indexOf(':') + let field: string + let value: string + if (colonIndex === -1) { + field = line + value = '' + } else { + field = line.slice(0, colonIndex) + value = line.slice(colonIndex + 1) + // A single leading space after the colon is part of the format, not the value. + if (value.charCodeAt(0) === 32 /* ' ' */) { + value = value.slice(1) + } + } + + switch (field) { + case 'event': + currentEventType = value + break + case 'data': + frameHasData = true + break + case 'id': + handlers.onId(value) + break + case 'retry': + if (/^\d+$/.test(value)) { + handlers.onRetry(parseInt(value, 10)) + } + break + default: + break + } + } + + return { + push(textChunk: string) { + if (!textChunk) { + return + } + lineBuffer += textChunk + let start = 0 + let i = 0 + while (i < lineBuffer.length) { + const code = lineBuffer.charCodeAt(i) + if (code === 10 /* '\n' */) { + processLine(lineBuffer.slice(start, i)) + i++ + start = i + } else if (code === 13 /* '\r' */) { + // A trailing CR may be the first half of a CRLF spanning the next chunk; hold it back. + if (i === lineBuffer.length - 1) { + break + } + processLine(lineBuffer.slice(start, i)) + i++ + if (lineBuffer.charCodeAt(i) === 10 /* '\n' */) { + i++ + } + start = i + } else { + i++ + } + } + lineBuffer = lineBuffer.slice(start) + }, + } +} diff --git a/packages/browser-core/test/emulate/mockFetch.ts b/packages/browser-core/test/emulate/mockFetch.ts index 278091f1e2..d59575440a 100644 --- a/packages/browser-core/test/emulate/mockFetch.ts +++ b/packages/browser-core/test/emulate/mockFetch.ts @@ -132,7 +132,7 @@ export class MockResponse implements Response { } get headers() { - return notYetImplemented() + return new Headers(this.options.headers) } get redirected() { @@ -161,6 +161,7 @@ export interface MockResponseOptions { body?: ReadableStream bodyUsed?: boolean bodyDisturbed?: boolean + headers?: Record } export type MockFetch = (input: RequestInfo, init?: RequestInit) => MockFetchPromise diff --git a/packages/browser-rum-core/src/domain/lifeCycle.ts b/packages/browser-rum-core/src/domain/lifeCycle.ts index 4e9e0626b2..ac58be9587 100644 --- a/packages/browser-rum-core/src/domain/lifeCycle.ts +++ b/packages/browser-rum-core/src/domain/lifeCycle.ts @@ -3,7 +3,7 @@ import type { Context, PageMayExitEvent, RawError } from '@datadog/browser-core' import { AbstractLifeCycle } from '@datadog/browser-core' import type { RumEventDomainContext } from '../domainContext.types' import type { RawRumEvent, AssembledRumEvent } from '../rawRumEvent.types' -import type { RequestCompleteEvent, RequestStartEvent } from './requestCollection' +import type { RequestCompleteEvent, RequestStartEvent, SseMetadataCollectedEvent } from './requestCollection' import type { AutoAction } from './action/actionCollection' import type { ViewEvent, ViewCreatedEvent, ViewEndedEvent, BeforeViewUpdateEvent } from './view/trackViews' import type { DurationVitalStart } from './vital/vitalCollection' @@ -22,6 +22,7 @@ export const enum LifeCycleEventType { AFTER_VIEW_ENDED, REQUEST_STARTED, REQUEST_COMPLETED, + SSE_METADATA_COLLECTED, // The SESSION_EXPIRED lifecycle event has been introduced to represent when a session has expired // and trigger cleanup tasks related to this, prior to renewing the session. Its implementation is @@ -67,6 +68,7 @@ declare const LifeCycleEventTypeAsConst: { AFTER_VIEW_ENDED: LifeCycleEventType.AFTER_VIEW_ENDED REQUEST_STARTED: LifeCycleEventType.REQUEST_STARTED REQUEST_COMPLETED: LifeCycleEventType.REQUEST_COMPLETED + SSE_METADATA_COLLECTED: LifeCycleEventType.SSE_METADATA_COLLECTED SESSION_EXPIRED: LifeCycleEventType.SESSION_EXPIRED SESSION_RENEWED: LifeCycleEventType.SESSION_RENEWED PAGE_MAY_EXIT: LifeCycleEventType.PAGE_MAY_EXIT @@ -89,6 +91,7 @@ export interface LifeCycleEventMap { [LifeCycleEventTypeAsConst.AFTER_VIEW_ENDED]: ViewEndedEvent [LifeCycleEventTypeAsConst.REQUEST_STARTED]: RequestStartEvent [LifeCycleEventTypeAsConst.REQUEST_COMPLETED]: RequestCompleteEvent + [LifeCycleEventTypeAsConst.SSE_METADATA_COLLECTED]: SseMetadataCollectedEvent [LifeCycleEventTypeAsConst.SESSION_EXPIRED]: void [LifeCycleEventTypeAsConst.SESSION_RENEWED]: void [LifeCycleEventTypeAsConst.PAGE_MAY_EXIT]: PageMayExitEvent diff --git a/packages/browser-rum-core/src/domain/requestCollection.ts b/packages/browser-rum-core/src/domain/requestCollection.ts index 4754b998ac..7862ffc056 100644 --- a/packages/browser-rum-core/src/domain/requestCollection.ts +++ b/packages/browser-rum-core/src/domain/requestCollection.ts @@ -10,11 +10,14 @@ import type { Observable, BufferedData, Subscription, + SseMetadata, } from '@datadog/browser-core' import { RequestType, ResponseBodyAction, BufferedDataType, + ExperimentalFeature, + isExperimentalFeatureEnabled, initFetchObservable, initXhrObservable, } from '@datadog/browser-core' @@ -66,6 +69,11 @@ export interface RequestCompleteEvent { responseBody?: string } +export interface SseMetadataCollectedEvent { + requestIndex: number + sseMetadata: SseMetadata +} + let nextRequestIndex = 1 export function startRequestCollection( @@ -147,6 +155,16 @@ export function trackFetch( } return ResponseBodyAction.IGNORE }, + collectSse: () => isExperimentalFeatureEnabled(ExperimentalFeature.SSE_EVENT_COUNTS), + onSseMetadata: (context, sseMetadata) => { + const fetchContext = context as RumFetchResolveContext + if (isAllowedRequestUrl(fetchContext.url)) { + lifeCycle.notify(LifeCycleEventType.SSE_METADATA_COLLECTED, { + requestIndex: fetchContext.requestIndex, + sseMetadata, + }) + } + }, }).subscribe((context) => { if (context.state === 'start' && isAllowedRequestUrl(context.url)) { tracer.traceFetch(context) diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts index 3adf084b2c..b4c61cd724 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts @@ -1,5 +1,5 @@ import type { RelativeTime, Duration, ServerDuration, TimeStamp } from '@datadog/js-core/time' -import type { MatchOption, TaskQueue } from '@datadog/browser-core' +import type { MatchOption, TaskQueue, SseMetadata } from '@datadog/browser-core' import { elapsed, toServerDuration } from '@datadog/js-core/time' import { createTaskQueue, display, RequestType, ResourceType } from '@datadog/browser-core' import type { Clock, MockTelemetry } from '@datadog/browser-core/test' @@ -96,6 +96,7 @@ describe('resourceCollection', () => { render_blocking_status: 'blocking', method: undefined, graphql: undefined, + sse: undefined, }, type: RumEventType.RESOURCE, _dd: { @@ -151,6 +152,7 @@ describe('resourceCollection', () => { download: { duration: 100000000 as ServerDuration, start: 0 as ServerDuration }, first_byte: { duration: 0 as ServerDuration, start: 0 as ServerDuration }, graphql: undefined, + sse: undefined, }, type: RumEventType.RESOURCE, _dd: { @@ -170,6 +172,30 @@ describe('resourceCollection', () => { }) }) + describe('SSE metadata enrichment', () => { + const sseMetadata: SseMetadata = { + event_count: 3, + events: [{ name: 'message', count: 3 }], + comment_count: 1, + tracking_end_reason: 'stream_closed', + } + + it('attaches sse metadata delivered (out of band) before the resource is assembled', () => { + setupResourceCollection() + lifeCycle.notify(LifeCycleEventType.SSE_METADATA_COLLECTED, { requestIndex: 42, sseMetadata }) + notifyRequest({ request: { requestIndex: 42, type: RequestType.FETCH } }) + + expect((rawRumEvents[0].rawRumEvent as RawRumResourceEvent).resource.sse).toEqual(sseMetadata) + }) + + it('leaves sse undefined when no metadata was delivered for the request', () => { + setupResourceCollection() + notifyRequest({ request: { requestIndex: 7, type: RequestType.FETCH } }) + + expect((rawRumEvents[0].rawRumEvent as RawRumResourceEvent).resource.sse).toBeUndefined() + }) + }) + describe('GraphQL metadata enrichment', () => { interface TestCase { requestType: RequestType @@ -361,6 +387,7 @@ describe('resourceCollection', () => { download: { duration: 100000000 as ServerDuration, start: 0 as ServerDuration }, first_byte: { duration: 0 as ServerDuration, start: 0 as ServerDuration }, graphql: undefined, + sse: undefined, }, type: RumEventType.RESOURCE, _dd: { @@ -471,6 +498,7 @@ describe('resourceCollection', () => { first_byte: { duration: 0 as ServerDuration, start: 0 as ServerDuration }, url: 'https://resource.com/valid', graphql: undefined, + sse: undefined, }, type: RumEventType.RESOURCE, _dd: { diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts index 3b727b3bdc..19bbce0026 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts @@ -13,6 +13,7 @@ import { RequestType, setTimeout, } from '@datadog/browser-core' +import type { SseMetadata } from '@datadog/browser-core' import type { MatchHeader, RumConfiguration } from '../configuration' import { RumPerformanceEntryType, createPerformanceObservable } from '../../browser/performanceObservable' import type { RumResourceEventDomainContext } from '../../domainContext.types' @@ -47,10 +48,25 @@ import { trackManualResources } from './trackManualResources' // site in `startResourceCollection` for the rationale. export const REQUEST_MATCHING_DELAY = 50 as Duration +// SSE counts arrive out of band (the stream ends after resolve); hold them until the matching +// resource is assembled. Capped so a resource that never assembles cannot leak the map. +const MAX_PENDING_SSE_METADATA = 1000 + export function startResourceCollection(lifeCycle: LifeCycle, configuration: RumConfiguration) { const taskQueue = mockable(createTaskQueue)() const requestRegistry = createRequestRegistry(lifeCycle) + const sseMetadataByRequestIndex = new Map() + const sseMetadataSubscription = lifeCycle.subscribe( + LifeCycleEventType.SSE_METADATA_COLLECTED, + ({ requestIndex, sseMetadata }) => { + sseMetadataByRequestIndex.set(requestIndex, sseMetadata) + if (sseMetadataByRequestIndex.size > MAX_PENDING_SSE_METADATA) { + sseMetadataByRequestIndex.delete(sseMetadataByRequestIndex.keys().next().value!) + } + } + ) + const performanceResourceSubscription = createPerformanceObservable(configuration, { type: RumPerformanceEntryType.RESOURCE, buffered: true, @@ -69,7 +85,12 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum // Note: we could clear the timeout on stop(), but this requires a bit of bookkeeping that // is not necessary right now. We could reevaluate in the future. setTimeout(() => { - const rawEvent = assembleResource(entry, requestRegistry.getMatchingRequest(entry), configuration) + const rawEvent = assembleResource( + entry, + requestRegistry.getMatchingRequest(entry), + configuration, + sseMetadataByRequestIndex + ) if (rawEvent) { lifeCycle.notify(LifeCycleEventType.RAW_RUM_EVENT_COLLECTED, rawEvent) } @@ -103,6 +124,7 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum stopRunOnReadyState() taskQueue.stop() performanceResourceSubscription.unsubscribe() + sseMetadataSubscription.unsubscribe() resourceTracker.stopAll() }, } @@ -111,7 +133,8 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum function assembleResource( entry: ResourceLikeEntry, request: RequestCompleteEvent | undefined, - configuration: RumConfiguration + configuration: RumConfiguration, + sseMetadataByRequestIndex?: Map ): RawRumEventCollectedData | undefined { const tracingInfo = request ? computeRequestTracingInfo(request, configuration) @@ -138,6 +161,7 @@ function assembleResource( protocol: computeResourceEntryProtocol(entry), delivery_type: computeResourceEntryDeliveryType(entry), graphql: request && computeGraphQlMetaData(request, configuration), + sse: request && computeSseMetadata(request, sseMetadataByRequestIndex), render_blocking_status: entry.renderBlockingStatus, ...computeResourceEntrySize(entry), ...computeResourceEntryDetails(entry), @@ -172,6 +196,19 @@ function computeGraphQlMetaData( return extractGraphQlMetadata(request, graphQlConfig) } +// SSE counts arrive after resolve (when the stream ends), keyed by request index. Consume the +// pending entry, if any, when the matching resource is assembled. +function computeSseMetadata( + request: RequestCompleteEvent, + sseMetadataByRequestIndex?: Map +): SseMetadata | undefined { + const sseMetadata = sseMetadataByRequestIndex?.get(request.requestIndex) + if (sseMetadata) { + sseMetadataByRequestIndex!.delete(request.requestIndex) + } + return sseMetadata +} + function computeContentTypeFromPerformanceEntry( entry: ResourceLikeEntry ): { resource: Pick } | undefined { diff --git a/packages/browser-rum-core/src/rawRumEvent.types.ts b/packages/browser-rum-core/src/rawRumEvent.types.ts index a115388795..c28a0a46e0 100644 --- a/packages/browser-rum-core/src/rawRumEvent.types.ts +++ b/packages/browser-rum-core/src/rawRumEvent.types.ts @@ -7,6 +7,7 @@ import type { DefaultPrivacyLevel, Csp, Context, + SseMetadata, } from '@datadog/browser-core' import type { GraphQlMetadata } from './domain/resource/graphql' import type { PageState } from './domain/contexts/pageStateHistory' @@ -73,6 +74,7 @@ export interface RawRumResourceEvent { protocol?: string delivery_type?: DeliveryType graphql?: GraphQlMetadata + sse?: SseMetadata request?: ResourceRequest response?: ResourceResponse } diff --git a/scripts/dev-server/lib/server.ts b/scripts/dev-server/lib/server.ts index c045a925a8..c0fe32e185 100644 --- a/scripts/dev-server/lib/server.ts +++ b/scripts/dev-server/lib/server.ts @@ -45,6 +45,52 @@ export function runServer({ writeIntakeFile = true }: { writeIntakeFile?: boolea }) ) + // Local Server-Sent Events endpoint for manually exercising SSE resource tracking. + // Query params: ?count= (default 3), ?delay= (default 100). + app.get('/sse', (req, res) => { + // ?prehead: destroy the socket before sending any headers (mimics wifi cut during connection + // setup -> fetch() itself rejects, no Response is ever returned). + if (req.query.prehead !== undefined) { + res.socket?.destroy() + return + } + + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }) + + const count = Number(req.query.count) || 3 + const delay = Number(req.query.delay) || 100 + // ?abort=: after writing n frames, kill the socket abnormally (mimics a wifi cut mid-stream). + const abortAfter = req.query.abort !== undefined ? Number(req.query.abort) || 2 : undefined + const frames = [': keepalive'] + for (let i = 0; i < count; i++) { + frames.push(`event: tokenDelta\ndata: {"index":${i}}`) + } + frames.push('data: {"unnamed":true}') // dispatches as `message` + frames.push('id: 42\nretry: 3000\nevent: done\ndata: [DONE]') + + let index = 0 + const timer = setInterval(() => { + if (abortAfter !== undefined && index >= abortAfter) { + clearInterval(timer) + res.socket?.destroy() // break the connection without a clean close + return + } + if (index >= frames.length) { + clearInterval(timer) + res.end() + return + } + res.write(`${frames[index]}\n\n`) + index++ + }, delay) + + req.on('close', () => clearInterval(timer)) + }) + app.use(createStaticSandboxApp()) app.use('/react-app', createReactApp())