From c6eb079ca93545fe92937a11a6b6ff6c1df558a9 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Fri, 19 Jun 2026 17:40:34 +0200 Subject: [PATCH 01/16] =?UTF-8?q?=E2=9C=85=20Stabilize=20Nuxt=20E2E=20apps?= =?UTF-8?q?=20with=20preview=20and=20explicit=20ports?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Run `yarn start` (nuxt preview) for both Nuxt test apps in Playwright so local runs match CI and avoid vite-node IPC flakes under parallel workers. Bind preview to fixed ports to avoid clashes with common 3000/3001 listeners. Co-authored-by: Cursor --- test/e2e/playwright.config.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/test/e2e/playwright.config.ts b/test/e2e/playwright.config.ts index 9ffd74279e..cca55095dd 100644 --- a/test/e2e/playwright.config.ts +++ b/test/e2e/playwright.config.ts @@ -96,11 +96,12 @@ const baseWebServers = [ name: 'nuxt app', stdout: 'pipe' as const, cwd: path.join(__dirname, '../apps/nuxt-app'), - command: isLocal ? 'yarn dev' : 'yarn start', - env: { NO_COLOR: '1' }, + // `nuxt dev` shares one vite-node IPC server; parallel Playwright workers can trigger + // intermittent EINVAL / aborted navigations. Preview matches CI and tolerates concurrency. + command: 'yarn start', + env: { NO_COLOR: '1', PORT: '3100', NITRO_PORT: '3100' }, wait: { - // yarn dev logs: "➜ Local: http://localhost:PORT" - // yarn start logs: "Listening on http://[::]:PORT" + // yarn preview logs: "Listening on http://[::]:PORT" (or localhost in some versions) stdout: /(?:Local:\s+http:\/\/localhost|Listening on http:\/\/(?:\[[^\]]+\]|[^:]+)):(?\d+)/, }, }, @@ -108,8 +109,8 @@ const baseWebServers = [ name: 'nuxt vue router v4 app', stdout: 'pipe' as const, cwd: path.join(__dirname, '../apps/nuxt-vue-router-v4-app'), - command: isLocal ? 'yarn dev' : 'yarn start', - env: { NO_COLOR: '1', PORT: '3001', NITRO_PORT: '3001' }, + command: 'yarn start', + env: { NO_COLOR: '1', PORT: '3200', NITRO_PORT: '3200' }, wait: { stdout: /(?:Local:\s+http:\/\/localhost|Listening on http:\/\/(?:\[[^\]]+\]|[^:]+)):(?\d+)/, From 78bf9cec5eb5b84bc591a4ed7194fc93b48f1730 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Wed, 3 Jun 2026 13:29:36 +0200 Subject: [PATCH 02/16] =?UTF-8?q?=E2=9C=A8=20Add=20webSocketObservable=20f?= =?UTF-8?q?or=20WebSocket=20lifecycle=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- .../src/browser/addEventListener.ts | 4 +- .../src/browser/webSocketObservable.spec.ts | 356 ++++++++++++++++++ .../src/browser/webSocketObservable.ts | 200 ++++++++++ packages/browser-core/src/index.ts | 9 + packages/browser-core/test/forEach.spec.ts | 2 + 5 files changed, 570 insertions(+), 1 deletion(-) create mode 100644 packages/browser-core/src/browser/webSocketObservable.spec.ts create mode 100644 packages/browser-core/src/browser/webSocketObservable.ts diff --git a/packages/browser-core/src/browser/addEventListener.ts b/packages/browser-core/src/browser/addEventListener.ts index 1fca98d267..c2e01df885 100644 --- a/packages/browser-core/src/browser/addEventListener.ts +++ b/packages/browser-core/src/browser/addEventListener.ts @@ -79,7 +79,9 @@ type EventMapFor = T extends Window ? WorkerEventMap : T extends CookieStore ? CookieStoreEventMap - : Record + : T extends WebSocket + ? WebSocketEventMap + : Record /** * Add an event listener to an event target object (Window, Element, mock object...). This provides diff --git a/packages/browser-core/src/browser/webSocketObservable.spec.ts b/packages/browser-core/src/browser/webSocketObservable.spec.ts new file mode 100644 index 0000000000..deae0833c2 --- /dev/null +++ b/packages/browser-core/src/browser/webSocketObservable.spec.ts @@ -0,0 +1,356 @@ +import { registerCleanupTask } from '../../test' +import type { Subscription } from '../tools/observable' +import type { WebSocketContext } from './webSocketObservable' +import { initWebSocketObservable, resetWebSocketObservable } from './webSocketObservable' + +// A minimal stand-in for the native `WebSocket` constructor. We do not connect to a real server in +// unit tests; instead we expose helpers to simulate the browser dispatching events on the instance. +class FakeWebSocket extends EventTarget { + static readonly CONNECTING = 0 + static readonly OPEN = 1 + static readonly CLOSING = 2 + static readonly CLOSED = 3 + + url: string + protocol = '' + bufferedAmount = 0 + readyState: number = FakeWebSocket.CONNECTING + onmessage: ((event: MessageEvent) => void) | null = null + onopen: ((event: Event) => void) | null = null + onclose: ((event: CloseEvent) => void) | null = null + + constructor(url: string | URL, protocols?: string | string[]) { + super() + this.url = String(url) + if (typeof protocols === 'string') { + this.protocol = protocols + } + } + + send(_data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + // no-op; tests will set `bufferedAmount` before calling send to verify it is sampled. + } + + close(_code?: number, _reason?: string): void { + this.readyState = FakeWebSocket.CLOSED + } + + simulateOpen() { + this.readyState = FakeWebSocket.OPEN + const event = new Event('open') + this.dispatchEvent(event) + this.onopen?.(event) + } + + simulateMessage(data: unknown) { + const event = new MessageEvent('message', { data }) + this.dispatchEvent(event) + this.onmessage?.(event) + } + + simulateClose(code: number, reason: string, wasClean: boolean) { + this.readyState = FakeWebSocket.CLOSED + // CloseEvent is not always constructable in test environments; use a plain Event with assigned fields. + const event = Object.assign(new Event('close'), { code, reason, wasClean }) as CloseEvent + this.dispatchEvent(event) + this.onclose?.(event) + } +} + +type FakeWebSocketConstructor = typeof FakeWebSocket + +const windowAsWebSocketHost = window as unknown as { WebSocket: FakeWebSocketConstructor } + +describe('webSocketObservable', () => { + let originalWebSocket: FakeWebSocketConstructor + let contexts: WebSocketContext[] + let subscription: Subscription | undefined + + beforeEach(() => { + originalWebSocket = windowAsWebSocketHost.WebSocket + windowAsWebSocketHost.WebSocket = FakeWebSocket + contexts = [] + + registerCleanupTask(() => { + subscription?.unsubscribe() + subscription = undefined + resetWebSocketObservable() + windowAsWebSocketHost.WebSocket = originalWebSocket + }) + }) + + function startTracking() { + subscription = initWebSocketObservable({ allowUntrustedEvents: true }).subscribe((context) => { + contexts.push(context) + }) + } + + function getContexts(state: T) { + return contexts.filter((context): context is Extract => context.state === state) + } + + describe('when tracking is started', () => { + beforeEach(() => { + startTracking() + }) + + describe('connecting context', () => { + it('emits a "connecting" context when a WebSocket is constructed', () => { + const url = 'wss://example.com/socket' + const ws = new windowAsWebSocketHost.WebSocket(url) + + const connectingContexts = getContexts('connecting') + expect(connectingContexts.length).toBe(1) + expect(connectingContexts[0].url).toBe(url) + expect(connectingContexts[0].instance).toBe(ws as unknown as WebSocket) + expect(connectingContexts[0].startClocks.timeStamp).toEqual(jasmine.any(Number)) + }) + + it('coerces URL objects to strings in the "connecting" context', () => { + const url = 'wss://example.com/socket' + new windowAsWebSocketHost.WebSocket(new URL(url)) + + expect(getContexts('connecting')[0].url).toBe(url) + }) + + it('does not include protocols in the "connecting" context when omitted', () => { + new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + + expect(getContexts('connecting')[0].protocols).toBeUndefined() + }) + + it('includes string protocols in the "connecting" context', () => { + const url = 'wss://example.com/socket' + const protocols = 'chat.v1' + new windowAsWebSocketHost.WebSocket(url, protocols) + + expect(getContexts('connecting')[0].protocols).toBe(protocols) + }) + + it('includes array protocols in the "connecting" context', () => { + const url = 'wss://example.com/socket' + const protocols = ['chat.v1', 'json'] + new windowAsWebSocketHost.WebSocket(url, protocols) + + expect(getContexts('connecting')[0].protocols).toEqual(protocols) + }) + }) + + describe('preservation of native behavior', () => { + it('does not clobber a customer-set onmessage handler', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const customerHandler = jasmine.createSpy() + ws.onmessage = customerHandler + + ws.simulateMessage('hello') + + expect(customerHandler).toHaveBeenCalledTimes(1) + expect(getContexts('message-in').length).toBe(1) + }) + + it('does not clobber a customer-set onopen handler', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const customerHandler = jasmine.createSpy() + ws.onopen = customerHandler + + ws.simulateOpen() + + expect(customerHandler).toHaveBeenCalledTimes(1) + expect(getContexts('open').length).toBe(1) + }) + + it('does not clobber a customer-set onclose handler', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const customerHandler = jasmine.createSpy() + ws.onclose = customerHandler + + ws.simulateClose(1000, 'bye', true) + + expect(customerHandler).toHaveBeenCalledTimes(1) + expect(getContexts('closed').length).toBe(1) + }) + }) + + describe('open context', () => { + it('emits an "open" context when the WebSocket opens', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const negotiatedProtocol = 'chat.v1' + ws.protocol = negotiatedProtocol + ws.simulateOpen() + + const openContexts = getContexts('open') + expect(openContexts.length).toBe(1) + expect(openContexts[0].protocol).toBe(negotiatedProtocol) + expect(openContexts[0].instance).toBe(ws as unknown as WebSocket) + expect(openContexts[0].openClocks.timeStamp).toEqual(jasmine.any(Number)) + }) + + it('emits an "open" context with empty protocol when no sub-protocol negotiated', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + + const openContexts = getContexts('open') + expect(openContexts.length).toBe(1) + expect(openContexts[0].protocol).toBe('') + }) + }) + + describe('message-in context', () => { + it('emits "message-in" with byte-length size for string payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const payload = 'hello world' + ws.simulateMessage(payload) + + const messageInContexts = getContexts('message-in') + expect(messageInContexts.length).toBe(1) + expect(messageInContexts[0].size).toBe(payload.length) + }) + + it('emits "message-in" with UTF-8 byte length for multi-byte strings', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + // 'é' is 2 bytes in UTF-8 and 'あ' is 3 bytes; total is 5 bytes for 2 chars + const payload = 'éあ' + ws.simulateMessage(payload) + + expect(getContexts('message-in')[0].size).toBe(new TextEncoder().encode(payload).byteLength) + }) + + it('emits "message-in" with byteLength for ArrayBuffer payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const byteLength = 16 + ws.simulateMessage(new ArrayBuffer(byteLength)) + + expect(getContexts('message-in')[0].size).toBe(byteLength) + }) + + it('emits "message-in" with byteLength for ArrayBufferView payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const viewByteLength = 12 + ws.simulateMessage(new Uint8Array(new ArrayBuffer(32), 4, viewByteLength)) + + expect(getContexts('message-in')[0].size).toBe(viewByteLength) + }) + + it('emits "message-in" with size for Blob payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const blob = new Blob(['hello']) + ws.simulateMessage(blob) + + expect(getContexts('message-in')[0].size).toBe(blob.size) + }) + }) + + describe('message-out context', () => { + it('emits "message-out" with size and bufferedAmountPreSend for string payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const bufferedAmountPreSend = 42 + ws.bufferedAmount = bufferedAmountPreSend + const payload = 'hello' + ws.send(payload) + + const messageOutContexts = getContexts('message-out') + expect(messageOutContexts.length).toBe(1) + expect(messageOutContexts[0].size).toBe(payload.length) + expect(messageOutContexts[0].bufferedAmountPreSend).toBe(bufferedAmountPreSend) + expect(messageOutContexts[0].at.timeStamp).toEqual(jasmine.any(Number)) + }) + + it('emits "message-out" with byteLength for ArrayBuffer payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const byteLength = 8 + ws.send(new ArrayBuffer(byteLength)) + + expect(getContexts('message-out')[0].size).toBe(byteLength) + }) + + it('emits "message-out" with byteLength for ArrayBufferView payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const viewByteLength = 10 + ws.send(new Uint8Array(new ArrayBuffer(20), 2, viewByteLength)) + + expect(getContexts('message-out')[0].size).toBe(viewByteLength) + }) + + it('emits "message-out" with size for Blob payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const blob = new Blob(['hello world']) + ws.send(blob) + + expect(getContexts('message-out')[0].size).toBe(blob.size) + }) + }) + + describe('closed context', () => { + it('emits a "closed" context with code, reason, and wasClean', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const closeCode = 1000 + const closeReason = 'bye' + const wasClean = true + ws.simulateClose(closeCode, closeReason, wasClean) + + const closeContexts = getContexts('closed') + expect(closeContexts.length).toBe(1) + expect(closeContexts[0].code).toBe(closeCode) + expect(closeContexts[0].reason).toBe(closeReason) + expect(closeContexts[0].wasClean).toBe(wasClean) + expect(closeContexts[0].at.timeStamp).toEqual(jasmine.any(Number)) + }) + }) + + describe('subscription lifecycle', () => { + it('restores the native WebSocket constructor when all subscribers unsubscribe', () => { + subscription?.unsubscribe() + subscription = undefined + + expect(windowAsWebSocketHost.WebSocket).toBe(FakeWebSocket) + }) + + it('does not emit any further events after all subscribers unsubscribe', () => { + subscription?.unsubscribe() + subscription = undefined + + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + ws.send('hello') + + expect(contexts.length).toBe(0) + }) + }) + }) + + describe('with conflicting allowUntrustedEvents policies across callers', () => { + it('does not emit open or message-in for untrusted events when the customer disallows them', () => { + initWebSocketObservable({ allowUntrustedEvents: true }) + subscription = initWebSocketObservable({ allowUntrustedEvents: false }).subscribe((context) => { + contexts.push(context) + }) + + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + ws.simulateMessage('hello') + + expect(getContexts('connecting').length).toBe(1) + expect(getContexts('open').length).toBe(0) + expect(getContexts('message-in').length).toBe(0) + }) + + it('emits open and message-in for untrusted events when every caller allows them', () => { + initWebSocketObservable({ allowUntrustedEvents: true }) + subscription = initWebSocketObservable({ allowUntrustedEvents: true }).subscribe((context) => { + contexts.push(context) + }) + + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + ws.simulateMessage('hello') + + expect(getContexts('open').length).toBe(1) + expect(getContexts('message-in').length).toBe(1) + }) + }) +}) diff --git a/packages/browser-core/src/browser/webSocketObservable.ts b/packages/browser-core/src/browser/webSocketObservable.ts new file mode 100644 index 0000000000..e42db0b1fe --- /dev/null +++ b/packages/browser-core/src/browser/webSocketObservable.ts @@ -0,0 +1,200 @@ +import type { ClocksState } from '@datadog/js-core/time' +import { clocksNow } from '@datadog/js-core/time' +import type { GlobalObject } from '../tools/globalObject' +import { globalObject } from '../tools/globalObject' +import { instrumentConstructor, instrumentMethod } from '../tools/instrumentMethod' +import { Observable } from '../tools/observable' +import { addEventListener } from './addEventListener' + +interface WebSocketObservableConfiguration { + allowUntrustedEvents?: boolean | undefined +} + +type GlobalWithWebSocket = GlobalObject & { WebSocket: typeof WebSocket } + +function isGlobalWithWebSocket(global: GlobalObject): global is GlobalWithWebSocket { + return typeof (global as { WebSocket?: unknown }).WebSocket === 'function' +} + +export interface WebSocketConnectingContext { + state: 'connecting' + instance: WebSocket + url: string + protocols?: string | string[] + startClocks: ClocksState +} + +export interface WebSocketOpenContext { + state: 'open' + instance: WebSocket + openClocks: ClocksState + protocol: string +} + +export interface WebSocketMessageInContext { + state: 'message-in' + instance: WebSocket + size: number + at: ClocksState +} + +export interface WebSocketMessageOutContext { + state: 'message-out' + instance: WebSocket + size: number + bufferedAmountPreSend: number + at: ClocksState +} + +export interface WebSocketClosedContext { + state: 'closed' + instance: WebSocket + code: number + reason: string + wasClean: boolean + at: ClocksState +} + +export type WebSocketContext = + | WebSocketConnectingContext + | WebSocketOpenContext + | WebSocketMessageInContext + | WebSocketMessageOutContext + | WebSocketClosedContext + +let webSocketObservable: Observable | undefined + +// The singleton WebSocket observable applies the latest caller's allowUntrustedEvents policy so +// that the customer's configuration overrides an early call (e.g. from bufferedData) that opts +// in before the customer config is parsed. +let allowUntrustedEvents: boolean | undefined + +export function initWebSocketObservable( + configuration: WebSocketObservableConfiguration = {} +): Observable { + if (configuration.allowUntrustedEvents !== undefined) { + allowUntrustedEvents = configuration.allowUntrustedEvents + } + + if (!webSocketObservable) { + webSocketObservable = createWebSocketObservable() + } + + return webSocketObservable +} + +function createWebSocketObservable() { + return new Observable((observable) => { + if (!isGlobalWithWebSocket(globalObject)) { + return undefined + } + + const stopListeners: Array<() => void> = [] + + const { stop: stopInstrumentingConstructor } = instrumentConstructor( + globalObject, + 'WebSocket', + ({ parameters, onPostCall }) => { + const url = String(parameters[0]) + const protocols = parameters[1] + const startClocks = clocksNow() + onPostCall((instance) => { + observable.notify({ + state: 'connecting', + instance, + url, + ...(protocols !== undefined ? { protocols } : {}), + startClocks, + }) + attachInstanceListeners(instance, observable, stopListeners) + }) + } + ) + + const { stop: stopInstrumentingSend } = instrumentMethod( + globalObject.WebSocket.prototype, + 'send', + ({ target: instance, parameters: [data], onPostCall }) => { + const size = computePayloadSize(data) + const bufferedAmountPreSend = instance.bufferedAmount + onPostCall(() => { + observable.notify({ + state: 'message-out', + instance, + size, + bufferedAmountPreSend, + at: clocksNow(), + }) + }) + } + ) + + return () => { + stopInstrumentingConstructor() + stopInstrumentingSend() + stopListeners.forEach((stop) => stop()) + stopListeners.length = 0 + } + }) +} + +function attachInstanceListeners( + instance: WebSocket, + observable: Observable, + stopListeners: Array<() => void> +) { + const { stop: stopOpen } = addEventListener({ allowUntrustedEvents }, instance, 'open', () => { + observable.notify({ + state: 'open', + instance, + openClocks: clocksNow(), + protocol: instance.protocol || '', + }) + }) + const { stop: stopMessage } = addEventListener({ allowUntrustedEvents }, instance, 'message', (event) => { + observable.notify({ + state: 'message-in', + instance, + size: computePayloadSize(event.data), + at: clocksNow(), + }) + }) + const { stop: stopClose } = addEventListener({ allowUntrustedEvents }, instance, 'close', (event) => { + observable.notify({ + state: 'closed', + instance, + code: event.code, + reason: event.reason, + wasClean: event.wasClean, + at: clocksNow(), + }) + }) + + stopListeners.push(stopOpen, stopMessage, stopClose) +} + +function computePayloadSize(data: unknown): number { + if (typeof data === 'string') { + return new TextEncoder().encode(data).byteLength + } + if (data instanceof ArrayBuffer) { + return data.byteLength + } + if (ArrayBuffer.isView(data)) { + return data.byteLength + } + if (typeof Blob !== 'undefined' && data instanceof Blob) { + return data.size + } + return 0 +} + +/** + * Reset the WebSocket observable global state. Test-only. + * + * @internal + */ +export function resetWebSocketObservable() { + webSocketObservable = undefined + allowUntrustedEvents = undefined +} diff --git a/packages/browser-core/src/index.ts b/packages/browser-core/src/index.ts index 2f832946cc..0b2ca19044 100644 --- a/packages/browser-core/src/index.ts +++ b/packages/browser-core/src/index.ts @@ -126,6 +126,15 @@ export type { XhrCompleteContext, XhrStartContext, XhrContext } from './browser/ export { initXhrObservable } from './browser/xhrObservable' export type { FetchResolveContext, FetchStartContext, FetchContext } from './browser/fetchObservable' export { initFetchObservable, ResponseBodyAction } from './browser/fetchObservable' +export type { + WebSocketContext, + WebSocketConnectingContext, + WebSocketOpenContext, + WebSocketMessageInContext, + WebSocketMessageOutContext, + WebSocketClosedContext as WebSocketCloseContext, +} from './browser/webSocketObservable' +export { initWebSocketObservable } from './browser/webSocketObservable' export { fetch } from './browser/fetch' export type { PageMayExitEvent } from './browser/pageMayExitObservable' export { createPageMayExitObservable, PageExitReason, isPageExitReason } from './browser/pageMayExitObservable' diff --git a/packages/browser-core/test/forEach.spec.ts b/packages/browser-core/test/forEach.spec.ts index f95041eb14..1c393d7960 100644 --- a/packages/browser-core/test/forEach.spec.ts +++ b/packages/browser-core/test/forEach.spec.ts @@ -4,6 +4,7 @@ import { resetValueHistoryGlobals } from '../src/tools/valueHistory' import { resetFetchObservable } from '../src/browser/fetchObservable' import { resetConsoleObservable } from '../src/domain/console/consoleObservable' import { resetXhrObservable } from '../src/browser/xhrObservable' +import { resetWebSocketObservable } from '../src/browser/webSocketObservable' import { resetGetCurrentSite } from '../src/browser/cookie' import { resetReplayStats } from '../../browser-rum/src/domain/replayStats' import { resetInteractionCountPolyfill } from '../../browser-rum-core/src/domain/view/viewMetrics/interactionCountPolyfill' @@ -34,6 +35,7 @@ afterEach(() => { resetFetchObservable() resetConsoleObservable() resetXhrObservable() + resetWebSocketObservable() resetGetCurrentSite() resetReplayStats() resetMonitor() From a57aa2e4b395cfaaa8a528045e26e138df24a452 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Thu, 11 Jun 2026 13:19:25 +0200 Subject: [PATCH 03/16] =?UTF-8?q?=E2=9A=97=EF=B8=8F=20Add=20WebSocket=20RU?= =?UTF-8?q?M=20Resource=20types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../browser-core/src/domain/resourceUtils.ts | 1 + .../browser-rum-core/src/rawRumEvent.types.ts | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/packages/browser-core/src/domain/resourceUtils.ts b/packages/browser-core/src/domain/resourceUtils.ts index f5849134c0..1d79857a1b 100644 --- a/packages/browser-core/src/domain/resourceUtils.ts +++ b/packages/browser-core/src/domain/resourceUtils.ts @@ -8,6 +8,7 @@ export const ResourceType = { IMAGE: 'image', FONT: 'font', MEDIA: 'media', + WEBSOCKET: 'websocket', OTHER: 'other', } as const diff --git a/packages/browser-rum-core/src/rawRumEvent.types.ts b/packages/browser-rum-core/src/rawRumEvent.types.ts index a115388795..22365c3a47 100644 --- a/packages/browser-rum-core/src/rawRumEvent.types.ts +++ b/packages/browser-rum-core/src/rawRumEvent.types.ts @@ -73,6 +73,7 @@ export interface RawRumResourceEvent { protocol?: string delivery_type?: DeliveryType graphql?: GraphQlMetadata + websocket?: WebSocketResourceProperties request?: ResourceRequest response?: ResourceResponse } @@ -86,6 +87,29 @@ export interface RawRumResourceEvent { context?: Context } +export interface WebSocketResourceProperties { + connection_id: string + handshake_succeeded: boolean + start_time: TimeStamp + end_time: TimeStamp + start_view_id?: string + end_view_id?: string + tracking_end_reason: 'close_event' | 'session_end' + close_code?: number + close_reason?: string + was_clean?: boolean + messages_in: { count: number; size: number } + messages_out: { count: number; size: number } + time_to_first_message_in?: Duration + time_to_first_message_out?: Duration + last_message_at?: TimeStamp + longest_silence: Duration + idle_duration_before_close?: Duration + buffered_amount_max?: number + protocol?: string + setup_duration?: Duration +} + export type NetworkHeaders = Record export interface ResourceRequest { From 9850272fe078342a8285c42536aa349d65828909 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Wed, 3 Jun 2026 13:30:48 +0200 Subject: [PATCH 04/16] =?UTF-8?q?=E2=9C=A8=20Add=20trackWebSockets=20confi?= =?UTF-8?q?guration=20option=20(default:=20false)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- packages/browser-core/src/tools/experimentalFeatures.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/browser-core/src/tools/experimentalFeatures.ts b/packages/browser-core/src/tools/experimentalFeatures.ts index 65c39b5ffa..1f988fba6d 100644 --- a/packages/browser-core/src/tools/experimentalFeatures.ts +++ b/packages/browser-core/src/tools/experimentalFeatures.ts @@ -16,6 +16,7 @@ import { objectHasValue } from './utils/objectUtils' export enum ExperimentalFeature { TRACK_INTAKE_REQUESTS = 'track_intake_requests', PARTIAL_VIEW_UPDATES = 'partial_view_updates', + TRACK_WEB_SOCKETS = 'track_web_sockets', } const enabledExperimentalFeatures: Set = new Set() From 96d8ebb3d04c5f7638c9761097b86b2287e402de Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Fri, 19 Jun 2026 13:53:21 +0200 Subject: [PATCH 05/16] =?UTF-8?q?=E2=9C=A8=20Collect=20WebSocket=20resourc?= =?UTF-8?q?e=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- .../src/browser/webSocketObservable.spec.ts | 35 +- .../src/browser/webSocketObservable.ts | 24 +- .../browser-rum-core/src/boot/startRum.ts | 8 + .../browser-rum-core/src/domain/lifeCycle.ts | 4 + .../src/domain/resource/resourceCollection.ts | 98 +++- .../src/domain/webSocketCollection.spec.ts | 433 ++++++++++++++++++ .../src/domain/webSocketCollection.ts | 251 ++++++++++ .../src/domainContext.types.ts | 4 +- .../browser-rum-core/src/rawRumEvent.types.ts | 6 +- .../browser-rum-core/test/formatValidation.ts | 12 +- scripts/dev-server/lib/server.ts | 1 + 11 files changed, 795 insertions(+), 81 deletions(-) create mode 100644 packages/browser-rum-core/src/domain/webSocketCollection.spec.ts create mode 100644 packages/browser-rum-core/src/domain/webSocketCollection.ts diff --git a/packages/browser-core/src/browser/webSocketObservable.spec.ts b/packages/browser-core/src/browser/webSocketObservable.spec.ts index deae0833c2..cb8989444c 100644 --- a/packages/browser-core/src/browser/webSocketObservable.spec.ts +++ b/packages/browser-core/src/browser/webSocketObservable.spec.ts @@ -1,5 +1,6 @@ import { registerCleanupTask } from '../../test' import type { Subscription } from '../tools/observable' +import { setAllowUntrustedEvents } from './addEventListener' import type { WebSocketContext } from './webSocketObservable' import { initWebSocketObservable, resetWebSocketObservable } from './webSocketObservable' @@ -80,7 +81,8 @@ describe('webSocketObservable', () => { }) function startTracking() { - subscription = initWebSocketObservable({ allowUntrustedEvents: true }).subscribe((context) => { + setAllowUntrustedEvents(true) + subscription = initWebSocketObservable().subscribe((context) => { contexts.push(context) }) } @@ -322,35 +324,4 @@ describe('webSocketObservable', () => { }) }) }) - - describe('with conflicting allowUntrustedEvents policies across callers', () => { - it('does not emit open or message-in for untrusted events when the customer disallows them', () => { - initWebSocketObservable({ allowUntrustedEvents: true }) - subscription = initWebSocketObservable({ allowUntrustedEvents: false }).subscribe((context) => { - contexts.push(context) - }) - - const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') - ws.simulateOpen() - ws.simulateMessage('hello') - - expect(getContexts('connecting').length).toBe(1) - expect(getContexts('open').length).toBe(0) - expect(getContexts('message-in').length).toBe(0) - }) - - it('emits open and message-in for untrusted events when every caller allows them', () => { - initWebSocketObservable({ allowUntrustedEvents: true }) - subscription = initWebSocketObservable({ allowUntrustedEvents: true }).subscribe((context) => { - contexts.push(context) - }) - - const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') - ws.simulateOpen() - ws.simulateMessage('hello') - - expect(getContexts('open').length).toBe(1) - expect(getContexts('message-in').length).toBe(1) - }) - }) }) diff --git a/packages/browser-core/src/browser/webSocketObservable.ts b/packages/browser-core/src/browser/webSocketObservable.ts index e42db0b1fe..77039c8874 100644 --- a/packages/browser-core/src/browser/webSocketObservable.ts +++ b/packages/browser-core/src/browser/webSocketObservable.ts @@ -6,10 +6,6 @@ import { instrumentConstructor, instrumentMethod } from '../tools/instrumentMeth import { Observable } from '../tools/observable' import { addEventListener } from './addEventListener' -interface WebSocketObservableConfiguration { - allowUntrustedEvents?: boolean | undefined -} - type GlobalWithWebSocket = GlobalObject & { WebSocket: typeof WebSocket } function isGlobalWithWebSocket(global: GlobalObject): global is GlobalWithWebSocket { @@ -64,18 +60,7 @@ export type WebSocketContext = let webSocketObservable: Observable | undefined -// The singleton WebSocket observable applies the latest caller's allowUntrustedEvents policy so -// that the customer's configuration overrides an early call (e.g. from bufferedData) that opts -// in before the customer config is parsed. -let allowUntrustedEvents: boolean | undefined - -export function initWebSocketObservable( - configuration: WebSocketObservableConfiguration = {} -): Observable { - if (configuration.allowUntrustedEvents !== undefined) { - allowUntrustedEvents = configuration.allowUntrustedEvents - } - +export function initWebSocketObservable(): Observable { if (!webSocketObservable) { webSocketObservable = createWebSocketObservable() } @@ -143,7 +128,7 @@ function attachInstanceListeners( observable: Observable, stopListeners: Array<() => void> ) { - const { stop: stopOpen } = addEventListener({ allowUntrustedEvents }, instance, 'open', () => { + const { stop: stopOpen } = addEventListener(instance, 'open', () => { observable.notify({ state: 'open', instance, @@ -151,7 +136,7 @@ function attachInstanceListeners( protocol: instance.protocol || '', }) }) - const { stop: stopMessage } = addEventListener({ allowUntrustedEvents }, instance, 'message', (event) => { + const { stop: stopMessage } = addEventListener(instance, 'message', (event) => { observable.notify({ state: 'message-in', instance, @@ -159,7 +144,7 @@ function attachInstanceListeners( at: clocksNow(), }) }) - const { stop: stopClose } = addEventListener({ allowUntrustedEvents }, instance, 'close', (event) => { + const { stop: stopClose } = addEventListener(instance, 'close', (event) => { observable.notify({ state: 'closed', instance, @@ -196,5 +181,4 @@ function computePayloadSize(data: unknown): number { */ export function resetWebSocketObservable() { webSocketObservable = undefined - allowUntrustedEvents = undefined } diff --git a/packages/browser-rum-core/src/boot/startRum.ts b/packages/browser-rum-core/src/boot/startRum.ts index 70923695e0..32a324d808 100644 --- a/packages/browser-rum-core/src/boot/startRum.ts +++ b/packages/browser-rum-core/src/boot/startRum.ts @@ -17,6 +17,8 @@ import { startGlobalContext, startUserContext, startTabContext, + isExperimentalFeatureEnabled, + ExperimentalFeature, } from '@datadog/browser-core' import { createDOMMutationObservable } from '../browser/domMutationObservable' import { createWindowOpenObservable } from '../browser/windowOpenObservable' @@ -24,6 +26,7 @@ import { startInternalContext } from '../domain/contexts/internalContext' import { LifeCycle, LifeCycleEventType } from '../domain/lifeCycle' import { startViewHistory } from '../domain/contexts/viewHistory' import { startRequestCollection } from '../domain/requestCollection' +import { startWebSocketCollection } from '../domain/webSocketCollection' import { startActionCollection } from '../domain/action/actionCollection' import { startErrorCollection } from '../domain/error/errorCollection' import { startResourceCollection } from '../domain/resource/resourceCollection' @@ -230,6 +233,11 @@ export function startRumEventCollection( const vitalCollection = startVitalCollection(lifeCycle, pageStateHistory) + if (isExperimentalFeatureEnabled(ExperimentalFeature.TRACK_WEB_SOCKETS)) { + const webSocketCollection = startWebSocketCollection(lifeCycle, viewHistory, vitalCollection.addDurationVital) + cleanupTasks.push(webSocketCollection.stop) + } + const internalContext = startInternalContext( configuration.applicationId, sessionManager, diff --git a/packages/browser-rum-core/src/domain/lifeCycle.ts b/packages/browser-rum-core/src/domain/lifeCycle.ts index 4e9e0626b2..8a7c0e81a6 100644 --- a/packages/browser-rum-core/src/domain/lifeCycle.ts +++ b/packages/browser-rum-core/src/domain/lifeCycle.ts @@ -4,6 +4,7 @@ import { AbstractLifeCycle } from '@datadog/browser-core' import type { RumEventDomainContext } from '../domainContext.types' import type { RawRumEvent, AssembledRumEvent } from '../rawRumEvent.types' import type { RequestCompleteEvent, RequestStartEvent } from './requestCollection' +import type { WebSocketCompleteEvent } from './webSocketCollection' import type { AutoAction } from './action/actionCollection' import type { ViewEvent, ViewCreatedEvent, ViewEndedEvent, BeforeViewUpdateEvent } from './view/trackViews' import type { DurationVitalStart } from './vital/vitalCollection' @@ -22,6 +23,7 @@ export const enum LifeCycleEventType { AFTER_VIEW_ENDED, REQUEST_STARTED, REQUEST_COMPLETED, + WEBSOCKET_COMPLETED, // The SESSION_EXPIRED lifecycle event has been introduced to represent when a session has expired // and trigger cleanup tasks related to this, prior to renewing the session. Its implementation is @@ -67,6 +69,7 @@ declare const LifeCycleEventTypeAsConst: { AFTER_VIEW_ENDED: LifeCycleEventType.AFTER_VIEW_ENDED REQUEST_STARTED: LifeCycleEventType.REQUEST_STARTED REQUEST_COMPLETED: LifeCycleEventType.REQUEST_COMPLETED + WEBSOCKET_COMPLETED: LifeCycleEventType.WEBSOCKET_COMPLETED SESSION_EXPIRED: LifeCycleEventType.SESSION_EXPIRED SESSION_RENEWED: LifeCycleEventType.SESSION_RENEWED PAGE_MAY_EXIT: LifeCycleEventType.PAGE_MAY_EXIT @@ -89,6 +92,7 @@ export interface LifeCycleEventMap { [LifeCycleEventTypeAsConst.AFTER_VIEW_ENDED]: ViewEndedEvent [LifeCycleEventTypeAsConst.REQUEST_STARTED]: RequestStartEvent [LifeCycleEventTypeAsConst.REQUEST_COMPLETED]: RequestCompleteEvent + [LifeCycleEventTypeAsConst.WEBSOCKET_COMPLETED]: WebSocketCompleteEvent [LifeCycleEventTypeAsConst.SESSION_EXPIRED]: void [LifeCycleEventTypeAsConst.SESSION_RENEWED]: void [LifeCycleEventTypeAsConst.PAGE_MAY_EXIT]: PageMayExitEvent diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts index 1171de8d74..00bbf7ed3c 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts @@ -1,47 +1,49 @@ -import type { Duration } from '@datadog/js-core/time' -import { toServerDuration, relativeToClocks } from '@datadog/js-core/time' import { - generateUUID, + addTelemetryDebug, createTaskQueue, + display, + generateUUID, + matchList, mockable, + RequestType, + ResourceType, runOnReadyState, - matchList, safeTruncate, - display, - addTelemetryDebug, - RequestType, setTimeout, } from '@datadog/browser-core' -import { combine } from '@datadog/js-core/util' -import type { MatchHeader, RumConfiguration } from '../configuration' -import { RumPerformanceEntryType, createPerformanceObservable } from '../../browser/performanceObservable' +import type { Duration } from '@datadog/js-core/time' +import { elapsed, relativeToClocks, toServerDuration } from '@datadog/js-core/time' +import { createPerformanceObservable, RumPerformanceEntryType } from '../../browser/performanceObservable' +import { getNavigationEntry } from '../../browser/performanceUtils' import type { RumResourceEventDomainContext } from '../../domainContext.types' import type { NetworkHeaders, RawRumResourceEvent, ResourceRequest, ResourceResponse } from '../../rawRumEvent.types' import { RumEventType } from '../../rawRumEvent.types' -import type { RawRumEventCollectedData, LifeCycle } from '../lifeCycle' +import type { MatchHeader, RumConfiguration } from '../configuration' +import { startEventTracker } from '../eventTracker' +import { extractRegexMatch } from '../extractRegexMatch' +import type { LifeCycle, RawRumEventCollectedData } from '../lifeCycle' import { LifeCycleEventType } from '../lifeCycle' import type { RequestCompleteEvent } from '../requestCollection' -import { createSpanIdentifier } from '../tracing/identifier' import { getDocumentTraceId } from '../tracing/getDocumentTraceId' -import { getNavigationEntry } from '../../browser/performanceUtils' -import { startEventTracker } from '../eventTracker' -import { extractRegexMatch } from '../extractRegexMatch' +import { createSpanIdentifier } from '../tracing/identifier' +import type { WebSocketCompleteEvent } from '../webSocketCollection' +import type { GraphQlMetadata } from './graphql' +import { extractGraphQlMetadata, findGraphQlConfiguration } from './graphql' +import { createRequestRegistry } from './requestRegistry' +import type { ResourceLikeEntry } from './resourceUtils' import { + computeResourceEntryDeliveryType, computeResourceEntryDetails, computeResourceEntryDuration, - computeResourceEntryType, - computeResourceEntrySize, computeResourceEntryProtocol, - computeResourceEntryDeliveryType, + computeResourceEntrySize, + computeResourceEntryType, isResourceEntryRequestType, sanitizeIfLongDataUrl, } from './resourceUtils' -import type { ResourceLikeEntry } from './resourceUtils' -import { createRequestRegistry } from './requestRegistry' -import type { GraphQlMetadata } from './graphql' -import { extractGraphQlMetadata, findGraphQlConfiguration } from './graphql' import type { ManualResourceData } from './trackManualResources' import { trackManualResources } from './trackManualResources' +import { combine } from '@datadog/js-core/util' // Delay before looking up the request matching a request-type performance entry. See the call // site in `startResourceCollection` for the rationale. @@ -51,6 +53,9 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum const taskQueue = mockable(createTaskQueue)() const requestRegistry = createRequestRegistry(lifeCycle) + lifeCycle.subscribe(LifeCycleEventType.WEBSOCKET_COMPLETED, (event: WebSocketCompleteEvent) => { + handleResource(() => assembleWebSocketResource(event)) + }) const performanceResourceSubscription = createPerformanceObservable({ type: RumPerformanceEntryType.RESOURCE, buffered: true, @@ -108,6 +113,55 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum } } +function assembleWebSocketResource( + event: WebSocketCompleteEvent +): RawRumEventCollectedData | undefined { + const duration = elapsed(event.startClocks.timeStamp, event.endClocks.timeStamp) + + const rawRumEvent: RawRumResourceEvent = { + date: event.startClocks.timeStamp, + type: RumEventType.RESOURCE, + resource: { + id: generateUUID(), + type: ResourceType.WEBSOCKET, + url: event.url, + duration: toServerDuration(duration), + websocket: { + connection_id: event.connectionId, + handshake_succeeded: event.handshakeSucceeded, + start_time: event.startClocks.timeStamp, + end_time: event.endClocks.timeStamp, + start_view_id: event.startViewId, + end_view_id: event.endViewId, + tracking_end_reason: event.trackingEndReason, + close_code: event.closeCode, + close_reason: event.closeReason, + was_clean: event.wasClean, + messages_in: event.messagesIn, + messages_out: event.messagesOut, + time_to_first_message_in: event.firstMessageInOffset, + time_to_first_message_out: event.firstMessageOutOffset, + last_message_in_at: event.lastMessageInAt, + longest_inbound_silence: event.longestInboundSilence, + inbound_idle_duration_before_close: event.inboundIdleDurationBeforeClose, + buffered_amount_max: event.bufferedAmountMax, + protocol: event.protocol, + setup_duration: event.setupDuration, + }, + }, + _dd: { + discarded: false, + }, + } + + return { + startClocks: event.startClocks, + duration, + rawRumEvent, + domainContext: {}, + } +} + function assembleResource( entry: ResourceLikeEntry, request: RequestCompleteEvent | undefined, diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts new file mode 100644 index 0000000000..49c02421b8 --- /dev/null +++ b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts @@ -0,0 +1,433 @@ +import type { WebSocketContext } from '@datadog/browser-core' +import { initWebSocketObservable, Observable } from '@datadog/browser-core' +import { registerCleanupTask } from '@datadog/browser-core/test' +import type { ClocksState, Duration, RelativeTime } from '@datadog/js-core/time' +import { elapsed, relativeToClocks } from '@datadog/js-core/time' +import { mockViewHistory } from '../../test' +import { VitalType } from '../rawRumEvent.types' +import type { ViewHistoryEntry } from './contexts/viewHistory' +import { LifeCycle, LifeCycleEventType } from './lifeCycle' +import type { DurationVital } from './vital/vitalCollection' +import type { WebSocketCompleteEvent } from './webSocketCollection' +import { startWebSocketCollection, trackWebSocket, WEBSOCKET_CONNECTING_VITAL_NAME } from './webSocketCollection' + +describe('webSocketCollection', () => { + let lifeCycle: LifeCycle + let wsObservable: Observable + let completed: WebSocketCompleteEvent[] + let wsInstance: WebSocket + + beforeEach(() => { + lifeCycle = new LifeCycle() + wsObservable = new Observable() + completed = [] + wsInstance = {} as WebSocket + lifeCycle.subscribe(LifeCycleEventType.WEBSOCKET_COMPLETED, (webSocket) => { + completed.push(webSocket) + }) + }) + + function startTracking( + viewHistory = mockViewHistory(), + addDurationVital: (vital: DurationVital) => void = jasmine.createSpy() + ) { + return trackWebSocket(lifeCycle, wsObservable, viewHistory, addDurationVital) + } + + function notifyConnecting( + startRelative = 0, + url = 'wss://example.com/socket', + startClocks?: ClocksState, + protocols?: string | string[] + ) { + wsObservable.notify({ + state: 'connecting', + instance: wsInstance, + url, + ...(protocols !== undefined ? { protocols } : {}), + startClocks: startClocks ?? relativeToClocks(startRelative as RelativeTime), + }) + } + + function notifyOpen(openRelative = 10, protocol = '', openClocks?: ClocksState) { + wsObservable.notify({ + state: 'open', + instance: wsInstance, + openClocks: openClocks ?? relativeToClocks(openRelative as RelativeTime), + protocol, + }) + } + + function notifyMessageIn(at: number, size: number) { + wsObservable.notify({ state: 'message-in', instance: wsInstance, size, at: relativeToClocks(at as RelativeTime) }) + } + + function notifyMessageOut(at: number, size: number, bufferedAmountPreSend = 0) { + wsObservable.notify({ + state: 'message-out', + instance: wsInstance, + size, + bufferedAmountPreSend, + at: relativeToClocks(at as RelativeTime), + }) + } + + function notifyClosed(at: number, code: number, reason: string, wasClean: boolean, atClocks?: ClocksState) { + wsObservable.notify({ + state: 'closed', + instance: wsInstance, + code, + reason, + wasClean, + at: atClocks ?? relativeToClocks(at as RelativeTime), + }) + } + + describe('handshakeSucceeded', () => { + it('is true when the open event fired before completion', () => { + const tracker = startTracking() + + notifyConnecting() + notifyOpen(10) + notifyClosed(40, 1000, 'bye', true) + expect(completed[0].handshakeSucceeded).toBeTrue() + + notifyConnecting() + notifyOpen(10) + tracker.flushOpenConnections('session_end') + expect(completed[1].handshakeSucceeded).toBeTrue() + }) + + it('is false when the open event never fired before completion', () => { + const tracker = startTracking() + notifyConnecting() + notifyClosed(25, 1006, 'abnormal', false) + expect(completed[0].handshakeSucceeded).toBeFalse() + + notifyConnecting() + tracker.flushOpenConnections('session_end') + expect(completed[1].handshakeSucceeded).toBeFalse() + }) + }) + + it('emits a completed event on close with tracking_end_reason="close_event"', () => { + const url = 'wss://example.com/socket' + const protocol = 'chat.v1' + const messageInSize = 100 + const messageOutSize = 50 + const bufferedAmount = 8 + const closeCode = 1000 + const closeReason = 'bye' + + startTracking() + notifyConnecting(0, url) + notifyOpen(10, protocol) + notifyMessageIn(20, messageInSize) + notifyMessageOut(30, messageOutSize, bufferedAmount) + notifyClosed(40, closeCode, closeReason, true) + + expect(completed.length).toBe(1) + const webSocket = completed[0] + + expect(webSocket.trackingEndReason).toBe('close_event') + expect(webSocket.closeCode).toBe(closeCode) + expect(webSocket.closeReason).toBe(closeReason) + expect(webSocket.wasClean).toBeTrue() + expect(webSocket.url).toBe(url) + expect(webSocket.protocol).toBe(protocol) + expect(webSocket.messagesIn).toEqual({ count: 1, size: messageInSize }) + expect(webSocket.messagesOut).toEqual({ count: 1, size: messageOutSize }) + expect(webSocket.bufferedAmountMax).toBe(bufferedAmount) + }) + + it('generates a unique connection_id per connection', () => { + startTracking() + notifyConnecting() + notifyClosed(1, 1000, 'reason_a', true) + const firstId = completed[0].connectionId + + wsInstance = {} as WebSocket + notifyConnecting() + notifyClosed(1, 1000, 'reason_b', true) + + expect(completed[1].connectionId).not.toBe(firstId) + }) + + it('records firstMessageInOffset / firstMessageOutOffset as offsets from open', () => { + const openAt = 10 + const firstMessageInAt = 13 + const firstMessageOutAt = 17 + + startTracking() + notifyConnecting() + notifyOpen(openAt) + notifyMessageIn(firstMessageInAt, 1) + notifyMessageIn(25, 1) // not first; should not update + notifyMessageOut(firstMessageOutAt, 1) + notifyClosed(30, 1000, 'bye', true) + + const webSocket = completed[0] + expect(webSocket.firstMessageInOffset).toBe((firstMessageInAt - openAt) as Duration) + expect(webSocket.firstMessageOutOffset).toBe((firstMessageOutAt - openAt) as Duration) + }) + + it('tracks longestInboundSilence from consecutive message-in only', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + notifyMessageIn(50, 1) // gap 30 + notifyMessageIn(75, 1) // gap 25 + notifyClosed(100, 1000, 'bye', true) + + expect(completed[0].longestInboundSilence).toBe(30 as Duration) + }) + + it('ignores message-out when computing longestInboundSilence', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + notifyMessageOut(100, 1) + notifyMessageIn(130, 1) // gap from last in (20) to 130 + notifyClosed(200, 1000, 'bye', true) + + expect(completed[0].longestInboundSilence).toBe(110 as Duration) + }) + + it('records inboundIdleDurationBeforeClose from last message-in to close', () => { + const lastMessageInAt = 20 + const closeAt = 50 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(lastMessageInAt, 1) + notifyClosed(closeAt, 1000, 'bye', true) + + expect(completed[0].inboundIdleDurationBeforeClose).toBe((closeAt - lastMessageInAt) as Duration) + }) + + it('leaves inboundIdleDurationBeforeClose and lastMessageInAt undefined when no message was received', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageOut(30, 1) + notifyClosed(50, 1000, 'bye', true) + + expect(completed[0].lastMessageInAt).toBeUndefined() + expect(completed[0].inboundIdleDurationBeforeClose).toBeUndefined() + }) + + it('records setupDuration as elapsed time from connecting to open', () => { + const startAt = 0 as RelativeTime + const openAt = 10 as RelativeTime + const startClocks = relativeToClocks(startAt) + const openClocks = relativeToClocks(openAt) + const expectedSetupDuration = elapsed(startClocks.timeStamp, openClocks.timeStamp) + + startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyOpen(openAt, '', openClocks) + notifyClosed(40, 1000, 'bye', true) + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + it('records setupDuration as elapsed time from connecting to close when open never fires', () => { + const startAt = 0 as RelativeTime + const closeAt = 25 as RelativeTime + const closeCode = 1006 + const closeReason = 'abnormal' + const startClocks = relativeToClocks(startAt) + const closeClocks = relativeToClocks(closeAt) + const expectedSetupDuration = elapsed(startClocks.timeStamp, closeClocks.timeStamp) + + startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyClosed(closeAt, closeCode, closeReason, false, closeClocks) + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + it('records setupDuration on session_end flush when the connection never opened', () => { + const tracker = startTracking() + notifyConnecting() + tracker.flushOpenConnections('session_end') + + const webSocket = completed[0] + expect(webSocket.setupDuration).toBe(elapsed(webSocket.startClocks.timeStamp, webSocket.endClocks.timeStamp)) + }) + + it('collects buffered_amount_max from message-out events', () => { + const peakBufferedAmount = 100 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageOut(20, 1, 10) + notifyMessageOut(30, 1, peakBufferedAmount) + notifyMessageOut(40, 1, 50) + notifyClosed(50, 1000, 'bye', true) + + expect(completed[0].bufferedAmountMax).toBe(peakBufferedAmount) + }) + + it('captures startViewId and endViewId from viewHistory', () => { + const startViewA = 0 as RelativeTime + const startViewB = 100 as RelativeTime + const viewByRelative: Record = { + [startViewA]: { id: 'view-A', startClocks: relativeToClocks(startViewA) }, + [startViewB]: { id: 'view-B', startClocks: relativeToClocks(startViewB) }, + } + const viewHistory = mockViewHistory() + spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => + startTime !== undefined ? viewByRelative[startTime as number] : undefined + ) + + startTracking(viewHistory) + notifyConnecting() + notifyClosed(startViewB, 1000, 'bye', true) + + const webSocket = completed[0] + expect(webSocket.startViewId).toBe('view-A') + expect(webSocket.endViewId).toBe('view-B') + }) + + it('flushOpenConnections finalizes still-open connections with tracking_end_reason="session_end"', () => { + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + + tracker.flushOpenConnections('session_end') + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeTrue() + expect(completed[0].closeCode).toBeUndefined() + expect(completed[0].closeReason).toBeUndefined() + expect(completed[0].wasClean).toBeUndefined() + }) + + it('does not finalize twice when close arrives after flushOpenConnections', () => { + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + tracker.flushOpenConnections('session_end') + notifyClosed(20, 1000, 'bye', true) + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + }) + + it('stop() unsubscribes from the observable and ignores further events', () => { + const tracker = startTracking() + notifyConnecting() + tracker.stop() + notifyClosed(20, 1000, 'bye', true) + + expect(completed.length).toBe(0) + }) + + describe('websocket-connecting vital', () => { + it('emits a duration-0 vital on connecting', () => { + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + startTracking(mockViewHistory(), addDurationVital) + notifyConnecting() + + expect(addDurationVital).toHaveBeenCalledOnceWith( + jasmine.objectContaining({ + name: WEBSOCKET_CONNECTING_VITAL_NAME, + type: VitalType.DURATION, + duration: 0, + }) + ) + }) + + it('uses the same id as the subsequent WEBSOCKET_COMPLETED connectionId', () => { + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + startTracking(mockViewHistory(), addDurationVital) + notifyConnecting() + notifyClosed(1, 1000, 'bye', true) + + expect(addDurationVital).toHaveBeenCalledOnceWith(jasmine.objectContaining({ id: completed[0].connectionId })) + }) + + it('includes url, protocols, and startViewId in the vital context', () => { + const startView = 0 as RelativeTime + const viewByRelative: Record = { + [startView]: { id: 'view-start', startClocks: relativeToClocks(startView) }, + } + const viewHistory = mockViewHistory() + spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => + startTime !== undefined ? viewByRelative[startTime as number] : undefined + ) + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + const url = 'wss://example.com/socket' + const protocols = ['chat.v1', 'json'] + + startTracking(viewHistory, addDurationVital) + notifyConnecting(startView, url, undefined, protocols) + + expect(addDurationVital).toHaveBeenCalledOnceWith( + jasmine.objectContaining({ + context: { + url, + protocols, + startViewId: 'view-start', + }, + }) + ) + }) + }) + + describe('startWebSocketCollection', () => { + const wsInstance = {} as WebSocket + const wsUrl = 'wss://example.com/socket' + + function notifyConnectionConnecting(startRelative = 0 as RelativeTime) { + // initWebSocketObservable returns a singleton, it will notify the same instance for all calls + initWebSocketObservable().notify({ + state: 'connecting', + instance: wsInstance, + url: wsUrl, + startClocks: relativeToClocks(startRelative), + }) + } + + it('finalizes open connections with tracking_end_reason="session_end" when the session expires', () => { + const collection = startWebSocketCollection(lifeCycle, mockViewHistory(), jasmine.createSpy()) + registerCleanupTask(() => collection.stop()) + notifyConnectionConnecting() + + lifeCycle.notify(LifeCycleEventType.SESSION_EXPIRED) + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeFalse() + expect(completed[0].closeCode).toBeUndefined() + expect(completed[0].closeReason).toBeUndefined() + expect(completed[0].wasClean).toBeUndefined() + }) + + it('ignores further WebSocket events from the same instance after stop()', () => { + const collection = startWebSocketCollection(lifeCycle, mockViewHistory(), jasmine.createSpy()) + notifyConnectionConnecting() + collection.stop() + + const eventCountAfterStop = completed.length + + initWebSocketObservable().notify({ + state: 'closed', + instance: wsInstance, + code: 1000, + reason: 'bye', + wasClean: true, + at: relativeToClocks(1000 as RelativeTime), + }) + + expect(completed.length).toBe(eventCountAfterStop) + }) + }) +}) diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.ts b/packages/browser-rum-core/src/domain/webSocketCollection.ts new file mode 100644 index 0000000000..2591737a0c --- /dev/null +++ b/packages/browser-rum-core/src/domain/webSocketCollection.ts @@ -0,0 +1,251 @@ +import type { Observable, WebSocketContext } from '@datadog/browser-core' +import { generateUUID, initWebSocketObservable, sanitize } from '@datadog/browser-core' +import type { ClocksState, Duration, TimeStamp } from '@datadog/js-core/time' +import { clocksNow, elapsed } from '@datadog/js-core/time' +import { VitalType } from '../rawRumEvent.types' +import type { ViewHistory } from './contexts/viewHistory' +import type { LifeCycle } from './lifeCycle' +import { LifeCycleEventType } from './lifeCycle' +import type { DurationVital } from './vital/vitalCollection' + +export const WEBSOCKET_CONNECTING_VITAL_NAME = 'websocket-connecting' + +export type WebSocketTrackingEndReason = 'close_event' | 'session_end' + +export interface WebSocketCompleteEvent { + connectionId: string + url: string + protocol?: string + startClocks: ClocksState + endClocks: ClocksState + startViewId?: string + endViewId?: string + messagesIn: { count: number; size: number } + messagesOut: { count: number; size: number } + firstMessageInOffset?: Duration + firstMessageOutOffset?: Duration + lastMessageInAt?: TimeStamp + longestInboundSilence: Duration + bufferedAmountMax: number + inboundIdleDurationBeforeClose?: Duration + closeCode?: number + closeReason?: string + wasClean?: boolean + handshakeSucceeded: boolean + trackingEndReason: WebSocketTrackingEndReason + setupDuration?: Duration +} + +interface WebSocketConnection { + connectionId: string + url: string + protocol?: string + startClocks: ClocksState + openClocks?: ClocksState + startViewId?: string + messagesIn: { count: number; size: number } + messagesOut: { count: number; size: number } + firstMessageInOffset?: Duration + firstMessageOutOffset?: Duration + lastMessageInAt?: TimeStamp + longestInboundSilence: Duration + bufferedAmountMax: number + setupDuration?: Duration +} + +export interface WebSocketConnectionTracker { + flushOpenConnections: (reason: WebSocketTrackingEndReason) => void + stop: () => void +} + +export function startWebSocketCollection( + lifeCycle: LifeCycle, + viewHistory: ViewHistory, + addDurationVital: (vital: DurationVital) => void +) { + const tracker = trackWebSocket(lifeCycle, initWebSocketObservable(), viewHistory, addDurationVital) + + const sessionExpiredSubscription = lifeCycle.subscribe(LifeCycleEventType.SESSION_EXPIRED, () => { + tracker.flushOpenConnections('session_end') + }) + + return { + stop: () => { + sessionExpiredSubscription.unsubscribe() + tracker.flushOpenConnections('session_end') + tracker.stop() + }, + } +} + +export function trackWebSocket( + lifeCycle: LifeCycle, + webSocketContextObservable: Observable, + viewHistory: ViewHistory, + addDurationVital: (vital: DurationVital) => void +): WebSocketConnectionTracker { + const webSocketRegistry = new Map() + + const subscription = webSocketContextObservable.subscribe((context) => { + switch (context.state) { + case 'connecting': { + const connectionId = generateUUID() + const startViewId = viewHistory.findView(context.startClocks.relative)?.id + const webSocket: WebSocketConnection = { + connectionId, + url: context.url, + startClocks: context.startClocks, + startViewId, + messagesIn: { count: 0, size: 0 }, + messagesOut: { count: 0, size: 0 }, + longestInboundSilence: 0 as Duration, + bufferedAmountMax: 0, + } + webSocketRegistry.set(context.instance, webSocket) + + addDurationVital({ + id: connectionId, + name: WEBSOCKET_CONNECTING_VITAL_NAME, + type: VitalType.DURATION, + startClocks: context.startClocks, + duration: 0 as Duration, + context: sanitize({ + url: context.url, + ...(context.protocols !== undefined ? { protocols: context.protocols } : {}), + ...(startViewId !== undefined ? { startViewId } : {}), + }), + }) + return + } + + case 'open': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + webSocket.openClocks = context.openClocks + webSocket.protocol = context.protocol + webSocket.setupDuration = elapsed(webSocket.startClocks.timeStamp, context.openClocks.timeStamp) + + return + } + + case 'message-in': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + webSocket.messagesIn.count += 1 + webSocket.messagesIn.size += context.size + recordMessageTiming(webSocket, context.at, 'in') + + return + } + + case 'message-out': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + if (context.bufferedAmountPreSend > webSocket.bufferedAmountMax) { + webSocket.bufferedAmountMax = context.bufferedAmountPreSend + } + webSocket.messagesOut.count += 1 + webSocket.messagesOut.size += context.size + recordMessageTiming(webSocket, context.at, 'out') + + return + } + + case 'closed': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + webSocketRegistry.delete(context.instance) + + lifeCycle.notify( + LifeCycleEventType.WEBSOCKET_COMPLETED, + buildCompletedEvent(webSocket, context, 'close_event', viewHistory) + ) + + return + } + } + }) + + return { + flushOpenConnections: (reason) => { + const at = clocksNow() + + webSocketRegistry.forEach((webSocket) => { + lifeCycle.notify( + LifeCycleEventType.WEBSOCKET_COMPLETED, + buildCompletedEvent(webSocket, { at }, reason, viewHistory) + ) + }) + + webSocketRegistry.clear() + }, + stop: () => { + subscription.unsubscribe() + webSocketRegistry.clear() + }, + } +} + +function recordMessageTiming(webSocket: WebSocketConnection, at: ClocksState, direction: 'in' | 'out') { + const handshakeFailed = webSocket.openClocks === undefined + + if (handshakeFailed) { + return + } + + const offset = elapsed(webSocket.openClocks!.timeStamp, at.timeStamp) + if (direction === 'in' && webSocket.firstMessageInOffset === undefined) { + webSocket.firstMessageInOffset = offset + } else if (direction === 'out' && webSocket.firstMessageOutOffset === undefined) { + webSocket.firstMessageOutOffset = offset + } + + if (direction === 'in') { + if (webSocket.lastMessageInAt !== undefined) { + const gap = elapsed(webSocket.lastMessageInAt, at.timeStamp) + if (gap > webSocket.longestInboundSilence) { + webSocket.longestInboundSilence = gap + } + } + webSocket.lastMessageInAt = at.timeStamp + } +} + +function buildCompletedEvent( + webSocket: WebSocketConnection, + endInfo: { at: ClocksState; code?: number; reason?: string; wasClean?: boolean }, + trackingEndReason: WebSocketTrackingEndReason, + viewHistory: ViewHistory +): WebSocketCompleteEvent { + const endClocks = endInfo.at + const endViewId = viewHistory.findView(endClocks.relative)?.id + const inboundIdleDurationBeforeClose = + webSocket.lastMessageInAt !== undefined ? elapsed(webSocket.lastMessageInAt, endClocks.timeStamp) : undefined + + const { openClocks, ...rest } = webSocket + + return { + ...rest, + endClocks, + endViewId, + inboundIdleDurationBeforeClose, + trackingEndReason, + handshakeSucceeded: openClocks !== undefined, + setupDuration: webSocket.setupDuration ?? elapsed(webSocket.startClocks.timeStamp, endClocks.timeStamp), + closeCode: endInfo.code, + closeReason: endInfo.reason, + wasClean: endInfo.wasClean, + } +} diff --git a/packages/browser-rum-core/src/domainContext.types.ts b/packages/browser-rum-core/src/domainContext.types.ts index 9b18e87a47..5d81c93d74 100644 --- a/packages/browser-rum-core/src/domainContext.types.ts +++ b/packages/browser-rum-core/src/domainContext.types.ts @@ -9,7 +9,7 @@ export type RumEventDomainContext = T extends type : T extends typeof RumEventType.ACTION ? RumActionEventDomainContext : T extends typeof RumEventType.RESOURCE - ? RumResourceEventDomainContext | RumManualResourceEventDomainContext + ? RumResourceEventDomainContext | RumManualResourceEventDomainContext | RumWebSocketResourceEventDomainContext : T extends typeof RumEventType.ERROR ? RumErrorEventDomainContext : T extends typeof RumEventType.LONG_TASK @@ -48,6 +48,8 @@ export interface RumManualResourceEventDomainContext { isManual: true } +export type RumWebSocketResourceEventDomainContext = Record + export interface RumErrorEventDomainContext { error: unknown handlingStack?: string diff --git a/packages/browser-rum-core/src/rawRumEvent.types.ts b/packages/browser-rum-core/src/rawRumEvent.types.ts index 22365c3a47..03e8650194 100644 --- a/packages/browser-rum-core/src/rawRumEvent.types.ts +++ b/packages/browser-rum-core/src/rawRumEvent.types.ts @@ -102,9 +102,9 @@ export interface WebSocketResourceProperties { messages_out: { count: number; size: number } time_to_first_message_in?: Duration time_to_first_message_out?: Duration - last_message_at?: TimeStamp - longest_silence: Duration - idle_duration_before_close?: Duration + last_message_in_at?: TimeStamp + longest_inbound_silence: Duration + inbound_idle_duration_before_close?: Duration buffered_amount_max?: number protocol?: string setup_duration?: Duration diff --git a/packages/browser-rum-core/test/formatValidation.ts b/packages/browser-rum-core/test/formatValidation.ts index 8387ee05d0..a5b7369b83 100644 --- a/packages/browser-rum-core/test/formatValidation.ts +++ b/packages/browser-rum-core/test/formatValidation.ts @@ -1,11 +1,12 @@ -import Ajv from 'ajv' -import { registerCleanupTask } from '@datadog/browser-core/test' import type { Context } from '@datadog/browser-core' +import { ResourceType } from '@datadog/browser-core' +import { registerCleanupTask } from '@datadog/browser-core/test' import { combine } from '@datadog/js-core/util' import type { CommonProperties } from '@datadog/browser-rum-core' +import Ajv from 'ajv' import type { LifeCycle, RawRumEventCollectedData } from '../src/domain/lifeCycle' import { LifeCycleEventType } from '../src/domain/lifeCycle' -import type { RawRumEvent } from '../src/rawRumEvent.types' +import { RumEventType, type RawRumEvent } from '../src/rawRumEvent.types' import { allJsonSchemas } from './allJsonSchemas' export function collectAndValidateRawRumEvents(lifeCycle: LifeCycle) { @@ -22,6 +23,11 @@ export function collectAndValidateRawRumEvents(lifeCycle: LifeCycle) { } function validateRumEventFormat(rawRumEvent: RawRumEvent) { + // TODO: Remove this once we have a proper websocket schema + if (rawRumEvent.type === RumEventType.RESOURCE && rawRumEvent.resource.type === ResourceType.WEBSOCKET) { + return + } + const fakeId = '00000000-aaaa-0000-aaaa-000000000000' const fakeContext: Partial = { _dd: { diff --git a/scripts/dev-server/lib/server.ts b/scripts/dev-server/lib/server.ts index c045a925a8..d19330af71 100644 --- a/scripts/dev-server/lib/server.ts +++ b/scripts/dev-server/lib/server.ts @@ -31,6 +31,7 @@ export function runServer({ writeIntakeFile = true }: { writeIntakeFile?: boolea app.use((_req, res, next) => { res.setHeader('Document-Policy', 'js-profiling') + res.setHeader('Access-Control-Allow-Origin', '*') next() }) From 6114191a8b8657e6d9c416c969b0c1a192646e69 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Mon, 15 Jun 2026 11:40:01 +0200 Subject: [PATCH 06/16] =?UTF-8?q?=F0=9F=91=8C=20Mark=20raw=20event=20as=20?= =?UTF-8?q?discarded=20if=20trackResources=20is=20false?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/domain/resource/resourceCollection.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts index 00bbf7ed3c..c345536c41 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts @@ -54,7 +54,7 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum const requestRegistry = createRequestRegistry(lifeCycle) lifeCycle.subscribe(LifeCycleEventType.WEBSOCKET_COMPLETED, (event: WebSocketCompleteEvent) => { - handleResource(() => assembleWebSocketResource(event)) + handleResource(() => assembleWebSocketResource(event, configuration)) }) const performanceResourceSubscription = createPerformanceObservable({ type: RumPerformanceEntryType.RESOURCE, @@ -114,7 +114,8 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum } function assembleWebSocketResource( - event: WebSocketCompleteEvent + event: WebSocketCompleteEvent, + configuration: RumConfiguration ): RawRumEventCollectedData | undefined { const duration = elapsed(event.startClocks.timeStamp, event.endClocks.timeStamp) @@ -150,7 +151,7 @@ function assembleWebSocketResource( }, }, _dd: { - discarded: false, + discarded: !configuration.trackResources, }, } From ed498f1b2fb2ece3bb2615297973f4e981c6e7c7 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Mon, 15 Jun 2026 11:51:58 +0200 Subject: [PATCH 07/16] =?UTF-8?q?=F0=9F=91=8C=20Improve=20listeners=20clea?= =?UTF-8?q?nup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/browser/webSocketObservable.ts | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/packages/browser-core/src/browser/webSocketObservable.ts b/packages/browser-core/src/browser/webSocketObservable.ts index 77039c8874..d7faf21ef1 100644 --- a/packages/browser-core/src/browser/webSocketObservable.ts +++ b/packages/browser-core/src/browser/webSocketObservable.ts @@ -74,8 +74,6 @@ function createWebSocketObservable() { return undefined } - const stopListeners: Array<() => void> = [] - const { stop: stopInstrumentingConstructor } = instrumentConstructor( globalObject, 'WebSocket', @@ -91,7 +89,7 @@ function createWebSocketObservable() { ...(protocols !== undefined ? { protocols } : {}), startClocks, }) - attachInstanceListeners(instance, observable, stopListeners) + attachInstanceListeners(instance, observable) }) } ) @@ -117,17 +115,11 @@ function createWebSocketObservable() { return () => { stopInstrumentingConstructor() stopInstrumentingSend() - stopListeners.forEach((stop) => stop()) - stopListeners.length = 0 } }) } -function attachInstanceListeners( - instance: WebSocket, - observable: Observable, - stopListeners: Array<() => void> -) { +function attachInstanceListeners(instance: WebSocket, observable: Observable) { const { stop: stopOpen } = addEventListener(instance, 'open', () => { observable.notify({ state: 'open', @@ -135,7 +127,10 @@ function attachInstanceListeners( openClocks: clocksNow(), protocol: instance.protocol || '', }) + + stopOpen() }) + const { stop: stopMessage } = addEventListener(instance, 'message', (event) => { observable.notify({ state: 'message-in', @@ -144,6 +139,7 @@ function attachInstanceListeners( at: clocksNow(), }) }) + const { stop: stopClose } = addEventListener(instance, 'close', (event) => { observable.notify({ state: 'closed', @@ -153,9 +149,10 @@ function attachInstanceListeners( wasClean: event.wasClean, at: clocksNow(), }) - }) - stopListeners.push(stopOpen, stopMessage, stopClose) + stopMessage() + stopClose() + }) } function computePayloadSize(data: unknown): number { From c05c24a10e45f476a477b809bd4d42e16e50a6b1 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Mon, 15 Jun 2026 13:14:46 +0200 Subject: [PATCH 08/16] =?UTF-8?q?=F0=9F=91=8C=20Allow=20redaction=20of=20w?= =?UTF-8?q?ebsocket.close=5Freason?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/browser-rum-core/src/domain/assembly.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/browser-rum-core/src/domain/assembly.ts b/packages/browser-rum-core/src/domain/assembly.ts index a55dfb1c1f..bdf9214245 100644 --- a/packages/browser-rum-core/src/domain/assembly.ts +++ b/packages/browser-rum-core/src/domain/assembly.ts @@ -40,6 +40,7 @@ const MODIFIABLE_FIELD_PATHS_BY_EVENT: Record Date: Mon, 15 Jun 2026 13:26:05 +0200 Subject: [PATCH 09/16] =?UTF-8?q?=F0=9F=91=8C=20Serialize=20websocket=20du?= =?UTF-8?q?ration=20fields=20in=20nanoseconds?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/domain/resource/resourceCollection.ts | 10 +++++----- packages/browser-rum-core/src/rawRumEvent.types.ts | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts index c345536c41..80a2a14aee 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts @@ -140,14 +140,14 @@ function assembleWebSocketResource( was_clean: event.wasClean, messages_in: event.messagesIn, messages_out: event.messagesOut, - time_to_first_message_in: event.firstMessageInOffset, - time_to_first_message_out: event.firstMessageOutOffset, + time_to_first_message_in: toServerDuration(event.firstMessageInOffset), + time_to_first_message_out: toServerDuration(event.firstMessageOutOffset), last_message_in_at: event.lastMessageInAt, - longest_inbound_silence: event.longestInboundSilence, - inbound_idle_duration_before_close: event.inboundIdleDurationBeforeClose, + longest_inbound_silence: toServerDuration(event.longestInboundSilence), + inbound_idle_duration_before_close: toServerDuration(event.inboundIdleDurationBeforeClose), buffered_amount_max: event.bufferedAmountMax, protocol: event.protocol, - setup_duration: event.setupDuration, + setup_duration: toServerDuration(event.setupDuration), }, }, _dd: { diff --git a/packages/browser-rum-core/src/rawRumEvent.types.ts b/packages/browser-rum-core/src/rawRumEvent.types.ts index 03e8650194..610de05a97 100644 --- a/packages/browser-rum-core/src/rawRumEvent.types.ts +++ b/packages/browser-rum-core/src/rawRumEvent.types.ts @@ -100,14 +100,14 @@ export interface WebSocketResourceProperties { was_clean?: boolean messages_in: { count: number; size: number } messages_out: { count: number; size: number } - time_to_first_message_in?: Duration - time_to_first_message_out?: Duration + time_to_first_message_in?: ServerDuration + time_to_first_message_out?: ServerDuration last_message_in_at?: TimeStamp - longest_inbound_silence: Duration - inbound_idle_duration_before_close?: Duration + longest_inbound_silence: ServerDuration + inbound_idle_duration_before_close?: ServerDuration buffered_amount_max?: number protocol?: string - setup_duration?: Duration + setup_duration?: ServerDuration } export type NetworkHeaders = Record From 24d650a654b61faf04f146c3e5ac6b65df4e5dae Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Wed, 17 Jun 2026 12:00:17 +0200 Subject: [PATCH 10/16] =?UTF-8?q?=F0=9F=91=8C=20Remove=20spread=20pattern?= =?UTF-8?q?=20for=20no=20undefined=20values?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/browser-core/src/browser/webSocketObservable.ts | 7 +++++-- .../browser-rum-core/src/domain/webSocketCollection.ts | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/browser-core/src/browser/webSocketObservable.ts b/packages/browser-core/src/browser/webSocketObservable.ts index d7faf21ef1..afa59c03d0 100644 --- a/packages/browser-core/src/browser/webSocketObservable.ts +++ b/packages/browser-core/src/browser/webSocketObservable.ts @@ -79,16 +79,18 @@ function createWebSocketObservable() { 'WebSocket', ({ parameters, onPostCall }) => { const url = String(parameters[0]) - const protocols = parameters[1] + const protocols = Array.isArray(parameters[1]) ? ([] as string[]).concat(parameters[1]) : parameters[1] const startClocks = clocksNow() + onPostCall((instance) => { observable.notify({ state: 'connecting', instance, url, - ...(protocols !== undefined ? { protocols } : {}), + protocols, startClocks, }) + attachInstanceListeners(instance, observable) }) } @@ -100,6 +102,7 @@ function createWebSocketObservable() { ({ target: instance, parameters: [data], onPostCall }) => { const size = computePayloadSize(data) const bufferedAmountPreSend = instance.bufferedAmount + onPostCall(() => { observable.notify({ state: 'message-out', diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.ts b/packages/browser-rum-core/src/domain/webSocketCollection.ts index 2591737a0c..e32b9ed014 100644 --- a/packages/browser-rum-core/src/domain/webSocketCollection.ts +++ b/packages/browser-rum-core/src/domain/webSocketCollection.ts @@ -111,8 +111,8 @@ export function trackWebSocket( duration: 0 as Duration, context: sanitize({ url: context.url, - ...(context.protocols !== undefined ? { protocols: context.protocols } : {}), - ...(startViewId !== undefined ? { startViewId } : {}), + protocols: context.protocols, + startViewId, }), }) return From 3f2029a2305a9be2dc894a8c20f85d1e6ef09c09 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Wed, 17 Jun 2026 17:32:36 +0200 Subject: [PATCH 11/16] =?UTF-8?q?=E2=9C=85=20Improve=20test=20flakyness=20?= =?UTF-8?q?with=20mockClock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/domain/webSocketCollection.spec.ts | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts index 49c02421b8..4eef64e13e 100644 --- a/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts +++ b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts @@ -1,6 +1,6 @@ import type { WebSocketContext } from '@datadog/browser-core' import { initWebSocketObservable, Observable } from '@datadog/browser-core' -import { registerCleanupTask } from '@datadog/browser-core/test' +import { mockClock, registerCleanupTask, type Clock } from '@datadog/browser-core/test' import type { ClocksState, Duration, RelativeTime } from '@datadog/js-core/time' import { elapsed, relativeToClocks } from '@datadog/js-core/time' import { mockViewHistory } from '../../test' @@ -16,8 +16,10 @@ describe('webSocketCollection', () => { let wsObservable: Observable let completed: WebSocketCompleteEvent[] let wsInstance: WebSocket + let clock: Clock beforeEach(() => { + clock = mockClock() lifeCycle = new LifeCycle() wsObservable = new Observable() completed = [] @@ -45,7 +47,7 @@ describe('webSocketCollection', () => { instance: wsInstance, url, ...(protocols !== undefined ? { protocols } : {}), - startClocks: startClocks ?? relativeToClocks(startRelative as RelativeTime), + startClocks: startClocks ?? relativeToClocks(clock.relative(startRelative)), }) } @@ -53,13 +55,13 @@ describe('webSocketCollection', () => { wsObservable.notify({ state: 'open', instance: wsInstance, - openClocks: openClocks ?? relativeToClocks(openRelative as RelativeTime), + openClocks: openClocks ?? relativeToClocks(clock.relative(openRelative)), protocol, }) } function notifyMessageIn(at: number, size: number) { - wsObservable.notify({ state: 'message-in', instance: wsInstance, size, at: relativeToClocks(at as RelativeTime) }) + wsObservable.notify({ state: 'message-in', instance: wsInstance, size, at: relativeToClocks(clock.relative(at)) }) } function notifyMessageOut(at: number, size: number, bufferedAmountPreSend = 0) { @@ -68,7 +70,7 @@ describe('webSocketCollection', () => { instance: wsInstance, size, bufferedAmountPreSend, - at: relativeToClocks(at as RelativeTime), + at: relativeToClocks(clock.relative(at)), }) } @@ -79,7 +81,7 @@ describe('webSocketCollection', () => { code, reason, wasClean, - at: atClocks ?? relativeToClocks(at as RelativeTime), + at: atClocks ?? relativeToClocks(clock.relative(at)), }) } @@ -220,10 +222,10 @@ describe('webSocketCollection', () => { }) it('records setupDuration as elapsed time from connecting to open', () => { - const startAt = 0 as RelativeTime - const openAt = 10 as RelativeTime - const startClocks = relativeToClocks(startAt) - const openClocks = relativeToClocks(openAt) + const startAt = 0 + const openAt = 10 + const startClocks = relativeToClocks(clock.relative(startAt)) + const openClocks = relativeToClocks(clock.relative(openAt)) const expectedSetupDuration = elapsed(startClocks.timeStamp, openClocks.timeStamp) startTracking() @@ -235,12 +237,12 @@ describe('webSocketCollection', () => { }) it('records setupDuration as elapsed time from connecting to close when open never fires', () => { - const startAt = 0 as RelativeTime - const closeAt = 25 as RelativeTime + const startAt = 0 + const closeAt = 25 const closeCode = 1006 const closeReason = 'abnormal' - const startClocks = relativeToClocks(startAt) - const closeClocks = relativeToClocks(closeAt) + const startClocks = relativeToClocks(clock.relative(startAt)) + const closeClocks = relativeToClocks(clock.relative(closeAt)) const expectedSetupDuration = elapsed(startClocks.timeStamp, closeClocks.timeStamp) startTracking() @@ -274,11 +276,12 @@ describe('webSocketCollection', () => { }) it('captures startViewId and endViewId from viewHistory', () => { - const startViewA = 0 as RelativeTime - const startViewB = 100 as RelativeTime + const startViewB = 100 + const relativeStartViewA = clock.relative(0) + const relativeStartViewB = clock.relative(startViewB) const viewByRelative: Record = { - [startViewA]: { id: 'view-A', startClocks: relativeToClocks(startViewA) }, - [startViewB]: { id: 'view-B', startClocks: relativeToClocks(startViewB) }, + [relativeStartViewA]: { id: 'view-A', startClocks: relativeToClocks(relativeStartViewA) }, + [relativeStartViewB]: { id: 'view-B', startClocks: relativeToClocks(relativeStartViewB) }, } const viewHistory = mockViewHistory() spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => @@ -355,9 +358,10 @@ describe('webSocketCollection', () => { }) it('includes url, protocols, and startViewId in the vital context', () => { - const startView = 0 as RelativeTime + const startView = 0 + const relativeStartView = clock.relative(startView) const viewByRelative: Record = { - [startView]: { id: 'view-start', startClocks: relativeToClocks(startView) }, + [relativeStartView]: { id: 'view-start', startClocks: relativeToClocks(relativeStartView) }, } const viewHistory = mockViewHistory() spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => @@ -386,13 +390,13 @@ describe('webSocketCollection', () => { const wsInstance = {} as WebSocket const wsUrl = 'wss://example.com/socket' - function notifyConnectionConnecting(startRelative = 0 as RelativeTime) { + function notifyConnectionConnecting(offsetMs = 0) { // initWebSocketObservable returns a singleton, it will notify the same instance for all calls initWebSocketObservable().notify({ state: 'connecting', instance: wsInstance, url: wsUrl, - startClocks: relativeToClocks(startRelative), + startClocks: relativeToClocks(clock.relative(offsetMs)), }) } @@ -424,7 +428,7 @@ describe('webSocketCollection', () => { code: 1000, reason: 'bye', wasClean: true, - at: relativeToClocks(1000 as RelativeTime), + at: relativeToClocks(clock.relative(1000)), }) expect(completed.length).toBe(eventCountAfterStop) From 40c40ce98540313997072bbe2285d8eab00f7b13 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Wed, 17 Jun 2026 17:58:12 +0200 Subject: [PATCH 12/16] =?UTF-8?q?=F0=9F=91=8C=20Improve=20computePayloadSi?= =?UTF-8?q?ze?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/browser-core/src/browser/webSocketObservable.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/browser-core/src/browser/webSocketObservable.ts b/packages/browser-core/src/browser/webSocketObservable.ts index afa59c03d0..4b07b96942 100644 --- a/packages/browser-core/src/browser/webSocketObservable.ts +++ b/packages/browser-core/src/browser/webSocketObservable.ts @@ -4,6 +4,7 @@ import type { GlobalObject } from '../tools/globalObject' import { globalObject } from '../tools/globalObject' import { instrumentConstructor, instrumentMethod } from '../tools/instrumentMethod' import { Observable } from '../tools/observable' +import { computeBytesCount } from '../tools/utils/byteUtils' import { addEventListener } from './addEventListener' type GlobalWithWebSocket = GlobalObject & { WebSocket: typeof WebSocket } @@ -160,12 +161,9 @@ function attachInstanceListeners(instance: WebSocket, observable: Observable Date: Thu, 18 Jun 2026 11:37:39 +0200 Subject: [PATCH 13/16] =?UTF-8?q?=F0=9F=91=8C=20Add=20proper=20description?= =?UTF-8?q?=20in=20dev=20extension?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/panel/components/tabs/eventsTab/eventRow.tsx | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx b/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx index b4f103fe34..d298abf2e2 100644 --- a/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx +++ b/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx @@ -54,6 +54,7 @@ const LOG_STATUS_COLOR = { const RESOURCE_TYPE_LABELS: Record = { xhr: 'XHR', fetch: 'Fetch', + websocket: 'WebSocket', document: 'Document', beacon: 'Beacon', css: 'CSS', @@ -435,9 +436,12 @@ function ErrorDescription({ event }: { event: RumErrorEvent }) { ) } +// TODO: remove this once we introduce websockets on rum-events-format +type RumResourceEventTypeWithWebSocket = RumResourceEvent['resource']['type'] | 'websocket' + function ResourceDescription({ event }: { event: RumResourceEvent }) { - const resourceType = event.resource.type - const isAsset = resourceType !== 'xhr' && resourceType !== 'fetch' + const resourceType = event.resource.type as RumResourceEventTypeWithWebSocket + const isAsset = resourceType !== 'xhr' && resourceType !== 'fetch' && resourceType !== 'websocket' if (isAsset) { return ( @@ -450,7 +454,8 @@ function ResourceDescription({ event }: { event: RumResourceEvent }) { return ( <> - {RESOURCE_TYPE_LABELS[resourceType]} request {event.resource.url} + {`${RESOURCE_TYPE_LABELS[resourceType]} ${resourceType === 'websocket' ? 'connection' : 'request'} `} + {event.resource.url} ) } From 5dfd54002b9513213e9a9b3d5439d6ebb859d3fc Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Thu, 18 Jun 2026 11:47:49 +0200 Subject: [PATCH 14/16] =?UTF-8?q?=F0=9F=91=8C=20Remove=20fake=20optional?= =?UTF-8?q?=20fields?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/browser-rum-core/src/domain/webSocketCollection.ts | 2 +- packages/browser-rum-core/src/rawRumEvent.types.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.ts b/packages/browser-rum-core/src/domain/webSocketCollection.ts index e32b9ed014..994d172eab 100644 --- a/packages/browser-rum-core/src/domain/webSocketCollection.ts +++ b/packages/browser-rum-core/src/domain/webSocketCollection.ts @@ -33,7 +33,7 @@ export interface WebSocketCompleteEvent { wasClean?: boolean handshakeSucceeded: boolean trackingEndReason: WebSocketTrackingEndReason - setupDuration?: Duration + setupDuration: Duration } interface WebSocketConnection { diff --git a/packages/browser-rum-core/src/rawRumEvent.types.ts b/packages/browser-rum-core/src/rawRumEvent.types.ts index 610de05a97..45c0871263 100644 --- a/packages/browser-rum-core/src/rawRumEvent.types.ts +++ b/packages/browser-rum-core/src/rawRumEvent.types.ts @@ -105,9 +105,9 @@ export interface WebSocketResourceProperties { last_message_in_at?: TimeStamp longest_inbound_silence: ServerDuration inbound_idle_duration_before_close?: ServerDuration - buffered_amount_max?: number + buffered_amount_max: number protocol?: string - setup_duration?: ServerDuration + setup_duration: ServerDuration } export type NetworkHeaders = Record From e1d7b33acc88d3aa9a836ced097b0450348a6480 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Thu, 18 Jun 2026 11:48:46 +0200 Subject: [PATCH 15/16] =?UTF-8?q?=F0=9F=91=8C=20Fix=20typo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/browser-core/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/browser-core/src/index.ts b/packages/browser-core/src/index.ts index 0b2ca19044..914ce52f31 100644 --- a/packages/browser-core/src/index.ts +++ b/packages/browser-core/src/index.ts @@ -132,7 +132,7 @@ export type { WebSocketOpenContext, WebSocketMessageInContext, WebSocketMessageOutContext, - WebSocketClosedContext as WebSocketCloseContext, + WebSocketClosedContext, } from './browser/webSocketObservable' export { initWebSocketObservable } from './browser/webSocketObservable' export { fetch } from './browser/fetch' From 6ffe09f7cbc3c916bbff474f39ad084919847601 Mon Sep 17 00:00:00 2001 From: Boris Dibon Date: Fri, 19 Jun 2026 11:35:31 +0200 Subject: [PATCH 16/16] =?UTF-8?q?=F0=9F=91=8C=20Add=20e2e=20tests=20for=20?= =?UTF-8?q?websockets=20capture?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/e2e/lib/framework/httpServers.ts | 15 +- test/e2e/lib/framework/serverApps/mock.ts | 53 +++++- test/e2e/lib/helpers/rawDataToString.ts | 12 ++ test/e2e/lib/helpers/validation.ts | 12 +- test/e2e/scenario/rum/websockets.scenario.ts | 177 +++++++++++++++++++ test/e2e/scripts/pinnedProxy.ts | 13 +- 6 files changed, 267 insertions(+), 15 deletions(-) create mode 100644 test/e2e/lib/helpers/rawDataToString.ts create mode 100644 test/e2e/scenario/rum/websockets.scenario.ts diff --git a/test/e2e/lib/framework/httpServers.ts b/test/e2e/lib/framework/httpServers.ts index 8049643be4..5379bd04e8 100644 --- a/test/e2e/lib/framework/httpServers.ts +++ b/test/e2e/lib/framework/httpServers.ts @@ -1,5 +1,6 @@ import * as http from 'http' import type { AddressInfo } from 'net' +import type { Duplex } from 'stream' import { test } from '@playwright/test' import type { Browser } from '@playwright/test' import { getIp } from '../../../envUtils' @@ -10,10 +11,14 @@ const MAX_SERVER_CREATION_RETRY = 5 const PORT_MIN = 9200 const PORT_MAX = 9400 -export type ServerApp = (req: http.IncomingMessage, res: http.ServerResponse) => any +export interface ServerApp { + (req: http.IncomingMessage, res: http.ServerResponse): any + handleUpgrade?: (req: http.IncomingMessage, socket: Duplex, head: Buffer) => void +} export type MockServerApp = ServerApp & { getLargeResponseWroteSize(): number + closeEchoWebSockets?: () => void } export interface Server { @@ -69,6 +74,14 @@ async function createServer(): Promise> { } }) + server.on('upgrade', (req: http.IncomingMessage, socket: Duplex, head: Buffer) => { + if (serverApp?.handleUpgrade) { + serverApp.handleUpgrade(req, socket, head) + } else { + socket.destroy() + } + }) + return { bindServerApp(newServerApp: App) { serverApp = newServerApp diff --git a/test/e2e/lib/framework/serverApps/mock.ts b/test/e2e/lib/framework/serverApps/mock.ts index 148f813600..461c592443 100644 --- a/test/e2e/lib/framework/serverApps/mock.ts +++ b/test/e2e/lib/framework/serverApps/mock.ts @@ -1,5 +1,8 @@ -import type { ServerResponse } from 'http' +import type { IncomingMessage, ServerResponse } from 'http' +import type { Duplex } from 'stream' import * as url from 'url' +import type { WebSocket } from 'ws' +import { WebSocketServer } from 'ws' import cors from 'cors' import qs from 'qs' import express from 'express' @@ -8,6 +11,7 @@ import type { MockServerApp, Servers } from '../httpServers' import { DEV_SERVER_BASE_URL } from '../../helpers/playwright' import type { SetupOptions } from '../pageSetups' import { workerSetup } from '../pageSetups' +import { rawDataToString } from '../../helpers/rawDataToString' export const LARGE_RESPONSE_MIN_BYTE_SIZE = 100_000 @@ -16,6 +20,8 @@ export function createMockServerApp(servers: Servers, setup: string, setupOption const app = express() let largeResponseBytesWritten = 0 + const { webSocketUrl, handleUpgrade, closeEchoWebSockets } = setupEchoWebSockets(servers.base.origin) + app.use(cors()) app.disable('etag') // disable automatic resource caching @@ -173,7 +179,7 @@ export function createMockServerApp(servers: Servers, setup: string, setupOption res.header( 'Content-Security-Policy', [ - `connect-src ${servers.datadogHttpApi.origin} ${servers.base.origin} ${servers.crossOrigin.origin} https://quota.browser-intake-datadoghq.com`, + `connect-src ${servers.datadogHttpApi.origin} ${servers.base.origin} ${webSocketUrl} ${servers.crossOrigin.origin} https://quota.browser-intake-datadoghq.com`, `script-src 'self' 'unsafe-inline' ${servers.crossOrigin.origin}`, "worker-src blob: 'self'", ].join(';') @@ -235,6 +241,8 @@ export function createMockServerApp(servers: Servers, setup: string, setupOption getLargeResponseWroteSize() { return largeResponseBytesWritten }, + handleUpgrade, + closeEchoWebSockets, }) } @@ -261,3 +269,44 @@ function forwardToDevServer(originalUrl: string, res: ServerResponse) { }) .catch(() => console.error(`Error fetching ${url}, did you run 'yarn dev'?`)) } + +function setupEchoWebSockets(httpOrigin: string) { + const webSocketUrl = httpOriginToWebsocketUrl(httpOrigin) + + const echoWebSockets = new Set() + const wss = new WebSocketServer({ noServer: true }) + wss.on('connection', (ws) => { + echoWebSockets.add(ws) + ws.on('close', () => { + echoWebSockets.delete(ws) + }) + ws.on('message', (data) => { + ws.send(`echo: ${rawDataToString(data)}`) + }) + }) + + function httpOriginToWebsocketUrl(httpOrigin: string) { + const parsed = new URL(httpOrigin) + const wsScheme = parsed.protocol === 'https:' ? 'wss:' : 'ws:' + return `${wsScheme}//${parsed.host}` + } + + function handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer) { + const pathname = new URL(req.url || '/', 'http://localhost').pathname + if (pathname === '/ws-echo') { + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req) + }) + } else { + socket.destroy() + } + } + + function closeEchoWebSockets() { + for (const client of echoWebSockets) { + client.close() + } + } + + return { webSocketUrl, handleUpgrade, closeEchoWebSockets } +} diff --git a/test/e2e/lib/helpers/rawDataToString.ts b/test/e2e/lib/helpers/rawDataToString.ts new file mode 100644 index 0000000000..56fdd45ce0 --- /dev/null +++ b/test/e2e/lib/helpers/rawDataToString.ts @@ -0,0 +1,12 @@ +import { Buffer } from 'node:buffer' +import type { RawData } from 'ws' + +export function rawDataToString(data: RawData): string { + if (Array.isArray(data)) { + return Buffer.concat(data).toString('utf8') + } + if (Buffer.isBuffer(data)) { + return data.toString('utf8') + } + return Buffer.from(data).toString('utf8') +} diff --git a/test/e2e/lib/helpers/validation.ts b/test/e2e/lib/helpers/validation.ts index b25ba843f8..f1e7ed8c0b 100644 --- a/test/e2e/lib/helpers/validation.ts +++ b/test/e2e/lib/helpers/validation.ts @@ -1,10 +1,13 @@ import fs from 'fs' import * as path from 'node:path' -import type { RumEvent } from '@datadog/browser-rum' +import type { RumEvent, RumResourceEvent } from '@datadog/browser-rum' import ajv from 'ajv' +import { RumEventType } from '@datadog/browser-rum-core' const schemasDir = path.join(path.dirname(require.resolve('@datadog/rum-events-format/package.json')), 'schemas') +type RumResourceEventTypeWithWebSocket = RumResourceEvent['resource']['type'] | 'websocket' + export function validateRumFormat(events: RumEvent[]) { const instance = new ajv({ allErrors: true, @@ -15,6 +18,13 @@ export function validateRumFormat(events: RumEvent[]) { instance.addSchema(allJsonSchemas) events.forEach((rumEvent) => { + // TODO: remove this once we introduce websockets on rum-events-format + if ( + rumEvent.type === RumEventType.RESOURCE && + (rumEvent.resource.type as RumResourceEventTypeWithWebSocket) === 'websocket' + ) { + return + } void instance.validate('rum-events-schema.json', rumEvent) if (instance.errors) { diff --git a/test/e2e/scenario/rum/websockets.scenario.ts b/test/e2e/scenario/rum/websockets.scenario.ts new file mode 100644 index 0000000000..94f0d75059 --- /dev/null +++ b/test/e2e/scenario/rum/websockets.scenario.ts @@ -0,0 +1,177 @@ +import type { RumResourceEvent } from '@datadog/browser-rum' +import type { RawRumEvent } from '@datadog/browser-rum-core' +import { expect, test } from '@playwright/test' +import { createTest, html } from '../../lib/framework' +import { expireSession } from '../../lib/helpers/session' + +const KNOWN_OUT_WS_MESSAGE = 'e2e-ws-ping' +const KNOWN_IN_WS_MESSAGE = `echo: ${KNOWN_OUT_WS_MESSAGE}` + +const WEBSOCKET_TEST_BODY = html` +

+

+ + + + + +` + +type RawRumResource = Extract +type WebSocketResourceProperties = NonNullable + +/** + * RUM resource event for our /ws-echo fixture with `resource.websocket` populated. Public + * {@link RumResourceEvent} omits `websocket` until rum-events-format is updated — use + * {@link isWebSocketResource} instead of casting at every filter/call site. + */ +type RumResourceEventWithWebSocket = RumResourceEvent & { + resource: RumResourceEvent['resource'] & { + websocket: WebSocketResourceProperties + } +} + +test.describe('rum websockets', () => { + createTest('collect websocket-connecting vital and websocket resource when the connection closes') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page }) => { + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + await page.locator('#ws-send').click() + await expect(page.locator('#ws-last-message')).toHaveText(KNOWN_IN_WS_MESSAGE) + await page.locator('#ws-close-client').click() + await expect(page.locator('#ws-status')).toContainText('closed') + + await flushEvents() + + const connectingVital = intakeRegistry.rumVitalEvents.find((e) => e.vital.name === 'websocket-connecting') + expect(connectingVital).toBeDefined() + + const rumEvent = getLastRumResourceEventWithWebSocket(intakeRegistry.rumResourceEvents) + expect(rumEvent).toBeDefined() + + const { websocket: ws } = rumEvent!.resource + + expect(ws.connection_id).toBe(connectingVital!.vital.id) + expect(ws.tracking_end_reason).toBe('close_event') + expect(ws.messages_out.count).toBe(1) + expect(ws.messages_out.size).toBe(KNOWN_OUT_WS_MESSAGE.length) + expect(ws.messages_in.count).toBe(1) + expect(ws.messages_in.size).toBe(KNOWN_IN_WS_MESSAGE.length) + }) + + createTest('websocket resource ends with close_event when the server closes the echo socket') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page, servers }) => { + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + + servers.base.app.closeEchoWebSockets!() + await expect(page.locator('#ws-status')).toContainText('closed') + + await flushEvents() + + const rumEvent = getLastRumResourceEventWithWebSocket(intakeRegistry.rumResourceEvents) + expect(rumEvent).toBeDefined() + + expect(rumEvent!.resource.websocket.tracking_end_reason).toBe('close_event') + }) + + createTest('websocket resource is reported with session_end when the session expires') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page, browserContext }) => { + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + await expireSession(page, browserContext) + + await flushEvents() + + const wsWithSessionEnd = getWebSocketResources(intakeRegistry.rumResourceEvents).find( + (e) => e.resource.websocket.tracking_end_reason === 'session_end' + ) + expect(wsWithSessionEnd).toBeDefined() + }) + + createTest('websocket resource records different start and end views when it spanned multiple views') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page }) => { + await page.evaluate(() => { + window.DD_RUM!.startView('view-a') + }) + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + await page.evaluate(() => { + window.DD_RUM!.startView('view-b') + }) + await page.locator('#ws-close-client').click() + + await flushEvents() + + const rumEvent = getLastRumResourceEventWithWebSocket(intakeRegistry.rumResourceEvents) + expect(rumEvent).toBeDefined() + + const { websocket: ws } = rumEvent!.resource + + expect(ws.start_view_id).toBeDefined() + expect(ws.end_view_id).toBeDefined() + expect(ws.start_view_id).not.toBe(ws.end_view_id) + }) +}) + +function isWebSocketResource(event: RumResourceEvent): event is RumResourceEventWithWebSocket { + // Public RumResourceEvent.resource omits `websocket` until rum-events-format is updated. + const resource = event.resource as unknown as { + url: unknown + type?: string + websocket?: WebSocketResourceProperties + } + + return resource.type === 'websocket' && resource.websocket !== null +} + +function getWebSocketResources(events: RumResourceEvent[]): RumResourceEventWithWebSocket[] { + return events.filter(isWebSocketResource) +} + +function getLastRumResourceEventWithWebSocket(events: RumResourceEvent[]): RumResourceEventWithWebSocket | undefined { + const list = getWebSocketResources(events) + return list.length === 0 ? undefined : list[list.length - 1] +} diff --git a/test/e2e/scripts/pinnedProxy.ts b/test/e2e/scripts/pinnedProxy.ts index 209780a076..1791fa2c8c 100644 --- a/test/e2e/scripts/pinnedProxy.ts +++ b/test/e2e/scripts/pinnedProxy.ts @@ -17,9 +17,10 @@ import http from 'node:http' import process from 'node:process' -import { Buffer } from 'node:buffer' import { WebSocketServer, WebSocket, type RawData } from 'ws' +import { rawDataToString } from '../lib/helpers/rawDataToString.ts' + interface JsonRpcMessage { method?: string guid?: string @@ -135,16 +136,6 @@ httpServer.listen(LISTEN, '127.0.0.1', () => { console.log(`[pinnedProxy] listening on ws://127.0.0.1:${LISTEN}/ -> ws://${UPSTREAM}/`) }) -function rawDataToString(data: RawData): string { - if (Array.isArray(data)) { - return Buffer.concat(data).toString('utf8') - } - if (Buffer.isBuffer(data)) { - return data.toString('utf8') - } - return Buffer.from(data).toString('utf8') -} - function forwardHeader(headers: http.IncomingHttpHeaders, name: string): Record { const value = headers[name] return typeof value === 'string' ? { [name]: value } : {}