Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 112 additions & 3 deletions packages/browser-core/src/browser/fetchObservable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ClocksState } from '@datadog/js-core/time'
import { clocksNow } from '@datadog/js-core/time'
import type { ClocksState, Duration } from '@datadog/js-core/time'
import { clocksNow, elapsed, toServerDuration } from '@datadog/js-core/time'
import type { InstrumentedMethodCall } from '../tools/instrumentMethod'
import { instrumentMethod } from '../tools/instrumentMethod'
import { monitorError } from '../tools/monitor'
Expand All @@ -8,7 +8,12 @@ import { normalizeUrl } from '../tools/utils/urlPolyfill'
import type { GlobalObject } from '../tools/globalObject'
import { globalObject } from '../tools/globalObject'
import { readBytesFromStream } from '../tools/readBytesFromStream'
import { noop } from '../tools/utils/functionUtils'
import { tryToClone } from '../tools/utils/responseUtils'
import type { TimeoutId } from '../tools/timer'
import { setTimeout, clearTimeout } from '../tools/timer'
import type { SseMetadata, SseTrackingEndReason } from '../tools/sse'
import { createSseEventCounter, isSseContentType, SSE_BYTE_LIMIT, SSE_TIME_LIMIT } from '../tools/sse'

interface FetchContextBase {
method: string
Expand Down Expand Up @@ -38,6 +43,11 @@ export type FetchContext = FetchStartContext | FetchResolveContext

type ResponseBodyActionGetter = (context: FetchResolveContext) => ResponseBodyAction

// Whether the SSE tap should run for this resolved fetch. Injected by RUM so the gate lives there.
type SseActionGetter = (context: FetchResolveContext) => boolean
// Receives SSE counts once the stream ends, delivered out of band from the `resolve` notification.
type SseMetadataListener = (context: FetchResolveContext, sseMetadata: SseMetadata) => void

/**
* Action to take with the response body of a fetch request.
* Values are ordered by priority: higher values take precedence when multiple actions are requested.
Expand All @@ -49,11 +59,27 @@ export const enum ResponseBodyAction {

let fetchObservable: Observable<FetchContext> | undefined
const responseBodyActionGetters: ResponseBodyActionGetter[] = []
const sseActionGetters: SseActionGetter[] = []
const sseMetadataListeners: SseMetadataListener[] = []

export function initFetchObservable({ responseBodyAction }: { responseBodyAction?: ResponseBodyActionGetter } = {}) {
export function initFetchObservable({
responseBodyAction,
collectSse,
onSseMetadata,
}: {
responseBodyAction?: ResponseBodyActionGetter
collectSse?: SseActionGetter
onSseMetadata?: SseMetadataListener
} = {}) {
if (responseBodyAction) {
responseBodyActionGetters.push(responseBodyAction)
}
if (collectSse) {
sseActionGetters.push(collectSse)
}
if (onSseMetadata) {
sseMetadataListeners.push(onSseMetadata)
}
if (!fetchObservable) {
fetchObservable = createFetchObservable()
}
Expand All @@ -63,6 +89,8 @@ export function initFetchObservable({ responseBodyAction }: { responseBodyAction
export function resetFetchObservable() {
fetchObservable = undefined
responseBodyActionGetters.length = 0
sseActionGetters.length = 0
sseMetadataListeners.length = 0
}

function createFetchObservable() {
Expand Down Expand Up @@ -161,7 +189,88 @@ async function afterSend(
// This is not critical and should not be reported as an SDK error
}
}
} else if (
sseActionGetters.some((getter) => getter(context)) &&
isSseContentType(response.headers.get('content-type'))
) {
// Read incrementally in the background so resolve (and page-activity) is not held for the
// stream's lifetime. Counts are delivered to listeners once the stream ends.
collectSseMetadata(context, response).then((sseMetadata) => {
if (sseMetadata) {
sseMetadataListeners.forEach((listener) => listener(context, sseMetadata))
}
}, monitorError)
}

observable.notify({ ...context, state: 'resolve' })
}

async function collectSseMetadata(context: FetchResolveContext, response: Response): Promise<SseMetadata | undefined> {
const clonedResponse = tryToClone(response)
if (!clonedResponse?.body) {
return
}

const reader = clonedResponse.body.getReader()
const decoder = new TextDecoder('utf-8')
const counter = createSseEventCounter()

let bytes = 0
let lastEventAt: Duration | undefined
let trackingEndReason: SseTrackingEndReason = 'stream_closed'

// Cancelling the clone does not disturb the app's own response branch.
let timedOut = false
const timeoutId: TimeoutId = setTimeout(() => {
timedOut = true
reader.cancel().catch(noop)
}, SSE_TIME_LIMIT)

try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
if (!value) {
continue
}

bytes += value.byteLength

// Stop before decoding a chunk that crosses the cap, so a single oversized chunk cannot grow
// the decoded string or the parser's line buffer past the byte budget.
if (bytes >= SSE_BYTE_LIMIT) {
trackingEndReason = 'byte_cap'
reader.cancel().catch(noop)
break
}

const text = decoder.decode(value, { stream: true })
if (text) {
counter.push(text)
lastEventAt = elapsed(context.startClocks.relative, clocksNow().relative)
}
}
if (timedOut) {
trackingEndReason = 'time_cap'
}
} catch (error) {
// Stream read errors (abort, network) are not SDK errors; preserve counts gathered so far.
if (timedOut) {
trackingEndReason = 'time_cap'
} else if (context.init?.signal?.aborted || (error instanceof DOMException && error.name === 'AbortError')) {
trackingEndReason = 'aborted'
} else {
trackingEndReason = 'error'
}
} finally {
clearTimeout(timeoutId)
}

return counter.finalize({
lastEventAt: toServerDuration(lastEventAt),
endTime: toServerDuration(elapsed(context.startClocks.relative, clocksNow().relative)),
trackingEndReason,
})
}
4 changes: 4 additions & 0 deletions packages/browser-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ export { CustomerDataType, CustomerContextKey, ContextManagerMethod } from './do
export type { ValueHistory, ValueHistoryEntry } from './tools/valueHistory'
export { createValueHistory, CLEAR_OLD_VALUES_INTERVAL } from './tools/valueHistory'
export { readBytesFromStream } from './tools/readBytesFromStream'
// Only the SSE wire-metadata types are part of the public surface: rum-core consumes `SseMetadata`
// (which references the other two) on the resource event. The counter, content-type helper, and
// limit constants stay package-internal — fetchObservable imports them directly from ./tools/sse.
export type { SseMetadata, SseEventEntry, SseTrackingEndReason } from './tools/sse'
export type { SessionState } from './domain/session/sessionState'
export { SESSION_STORE_KEY } from './domain/session/storeStrategies/sessionStoreStrategy'
export type { MemorySession } from './domain/session/storeStrategies/sessionInMemory'
Expand Down
1 change: 1 addition & 0 deletions packages/browser-core/src/tools/experimentalFeatures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { objectHasValue } from './utils/objectUtils'
export enum ExperimentalFeature {
TRACK_INTAKE_REQUESTS = 'track_intake_requests',
PARTIAL_VIEW_UPDATES = 'partial_view_updates',
SSE_EVENT_COUNTS = 'sse_event_counts',
}

const enabledExperimentalFeatures: Set<ExperimentalFeature> = new Set()
Expand Down
214 changes: 214 additions & 0 deletions packages/browser-core/src/tools/sse.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import type { ServerDuration } from '@datadog/js-core/time'
import {
createSseEventCounter,
isSseContentType,
MAX_SSE_EVENT_NAMES,
MAX_SSE_EVENT_NAME_LENGTH,
SSE_OTHER_EVENT_NAME,
} from './sse'
import type { SseFinalizeInfo } from './sse'

const STREAM_CLOSED: SseFinalizeInfo = { trackingEndReason: 'stream_closed' }

function countsByName(events: Array<{ name: string; count: number }> | undefined): Record<string, number> {
const result: Record<string, number> = {}
events?.forEach(({ name, count }) => {
result[name] = count
})
return result
}

describe('isSseContentType', () => {
it('is true for text/event-stream', () => {
expect(isSseContentType('text/event-stream')).toBe(true)
})

it('is true ignoring a charset suffix', () => {
expect(isSseContentType('text/event-stream; charset=utf-8')).toBe(true)
})

it('is true ignoring leading whitespace and case', () => {
expect(isSseContentType(' Text/Event-Stream')).toBe(true)
})

it('is false for other content types', () => {
expect(isSseContentType('application/json')).toBe(false)
})

it('is false for empty / nullish values', () => {
expect(isSseContentType('')).toBe(false)
expect(isSseContentType(null)).toBe(false)
expect(isSseContentType(undefined)).toBe(false)
})
})

describe('createSseEventCounter', () => {
it('counts mixed named and unnamed frames', () => {
const counter = createSseEventCounter()
counter.push('event: tokenDelta\ndata: {}\n\ndata: {}\n\nevent: done\ndata: [DONE]\n\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(3)
expect(countsByName(metadata.events)).toEqual({ tokenDelta: 1, message: 1, done: 1 })
})

it('buckets frames with no event: field under message', () => {
const counter = createSseEventCounter()
counter.push('data: a\n\ndata: b\n\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(countsByName(metadata.events)).toEqual({ message: 2 })
})

it('counts a frame split across two push() calls exactly once', () => {
const counter = createSseEventCounter()
counter.push('event: tok')
counter.push('enDelta\ndata: {}\n')
counter.push('\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(1)
expect(countsByName(metadata.events)).toEqual({ tokenDelta: 1 })
})

it('counts multiple data: lines in one frame as a single event', () => {
const counter = createSseEventCounter()
counter.push('event: msg\ndata: line1\ndata: line2\ndata: line3\n\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(1)
})

it('counts comment lines separately from events', () => {
const counter = createSseEventCounter()
counter.push(': keepalive\nevent: a\ndata: x\n\n: another comment\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(1)
expect(metadata.comment_count).toBe(2)
})

it('does not count an incomplete trailing frame', () => {
const counter = createSseEventCounter()
counter.push('event: a\ndata: x\n\nevent: b\ndata: y')
const metadata = counter.finalize({ ...STREAM_CLOSED, trackingEndReason: 'aborted' })!

expect(metadata.event_count).toBe(1)
expect(countsByName(metadata.events)).toEqual({ a: 1 })
})

it('returns undefined when no events were seen', () => {
const counter = createSseEventCounter()
counter.push(': only a comment\n\n')
expect(counter.finalize(STREAM_CLOSED)).toBeUndefined()
})

it('parses CRLF line endings identically to LF', () => {
const counter = createSseEventCounter()
counter.push('event: a\r\ndata: x\r\n\r\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(1)
expect(countsByName(metadata.events)).toEqual({ a: 1 })
})

it('parses bare CR line endings identically to LF', () => {
const counter = createSseEventCounter()
// Bare CRs split the fields and the terminating blank line; the final LF resolves the held CR.
counter.push('event: a\rdata: x\r\r')
counter.push('\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(1)
expect(countsByName(metadata.events)).toEqual({ a: 1 })
})

it('joins a CRLF split across two push() calls without dispatching a spurious frame', () => {
const counter = createSseEventCounter()
counter.push('event: a\rdata: x\r') // trailing CR is held back
counter.push('\n\r\n') // completes the CRLF, then a blank line ends the frame
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.event_count).toBe(1)
expect(countsByName(metadata.events)).toEqual({ a: 1 })
})

it('folds a customer event literally named _other_ into the overflow bucket without double-counting', () => {
const counter = createSseEventCounter()
counter.push(`event: ${SSE_OTHER_EVENT_NAME}\ndata: x\n\n`) // a real event named _other_, seen early
for (let i = 0; i < MAX_SSE_EVENT_NAMES + 2; i++) {
counter.push(`event: e${i}\ndata: x\n\n`) // fills the cap, then 2 overflow into _other_
}
const metadata = counter.finalize(STREAM_CLOSED)!

// _other_ never occupied a tracked slot, so it appears once with the real + overflow total (1 + 2).
expect(countsByName(metadata.events)[SSE_OTHER_EVENT_NAME]).toBe(3)
expect(metadata.events.filter((entry) => entry.name === SSE_OTHER_EVENT_NAME).length).toBe(1)
expect(metadata.event_count).toBe(MAX_SSE_EVENT_NAMES + 3)
})

it('truncates a very long last_event_id', () => {
const counter = createSseEventCounter()
const longId = 'a'.repeat(MAX_SSE_EVENT_NAME_LENGTH + 50)
counter.push(`id: ${longId}\ndata: x\n\n`)
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.last_event_id!.length).toBe(MAX_SSE_EVENT_NAME_LENGTH)
})

it('tracks exactly 20 distinct names individually with no _other_', () => {
const counter = createSseEventCounter()
for (let i = 0; i < MAX_SSE_EVENT_NAMES; i++) {
counter.push(`event: e${i}\ndata: x\n\n`)
}
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.events.length).toBe(MAX_SSE_EVENT_NAMES)
expect(metadata.events.some((entry) => entry.name === SSE_OTHER_EVENT_NAME)).toBe(false)
expect(metadata.event_count).toBe(MAX_SSE_EVENT_NAMES)
})

it('routes names beyond the cap into _other_ while keeping the grand total', () => {
const counter = createSseEventCounter()
for (let i = 0; i < MAX_SSE_EVENT_NAMES + 5; i++) {
counter.push(`event: e${i}\ndata: x\n\n`)
}
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.events.length).toBe(MAX_SSE_EVENT_NAMES + 1)
expect(countsByName(metadata.events)[SSE_OTHER_EVENT_NAME]).toBe(5)
expect(metadata.event_count).toBe(MAX_SSE_EVENT_NAMES + 5)
})

it('truncates very long event names', () => {
const counter = createSseEventCounter()
const longName = 'x'.repeat(MAX_SSE_EVENT_NAME_LENGTH + 50)
counter.push(`event: ${longName}\ndata: y\n\n`)
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.events[0].name.length).toBe(MAX_SSE_EVENT_NAME_LENGTH)
})

it('captures last_event_id and retry_hint', () => {
const counter = createSseEventCounter()
counter.push('id: 1\nretry: 3000\ndata: x\n\nid: 2\ndata: y\n\n')
const metadata = counter.finalize(STREAM_CLOSED)!

expect(metadata.last_event_id).toBe('2')
expect(metadata.retry_hint).toBe(3000)
})

it('merges caller-provided timing and end reason into finalize', () => {
const counter = createSseEventCounter()
counter.push('data: x\n\n')
const metadata = counter.finalize({
lastEventAt: 20 as ServerDuration,
endTime: 30 as ServerDuration,
trackingEndReason: 'byte_cap',
})!

expect(metadata.last_event_at).toBe(20 as ServerDuration)
expect(metadata.end_time).toBe(30 as ServerDuration)
expect(metadata.tracking_end_reason).toBe('byte_cap')
})
})
Loading
Loading