diff --git a/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx b/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx index b4f103fe34..d298abf2e2 100644 --- a/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx +++ b/developer-extension/src/panel/components/tabs/eventsTab/eventRow.tsx @@ -54,6 +54,7 @@ const LOG_STATUS_COLOR = { const RESOURCE_TYPE_LABELS: Record = { xhr: 'XHR', fetch: 'Fetch', + websocket: 'WebSocket', document: 'Document', beacon: 'Beacon', css: 'CSS', @@ -435,9 +436,12 @@ function ErrorDescription({ event }: { event: RumErrorEvent }) { ) } +// TODO: remove this once we introduce websockets on rum-events-format +type RumResourceEventTypeWithWebSocket = RumResourceEvent['resource']['type'] | 'websocket' + function ResourceDescription({ event }: { event: RumResourceEvent }) { - const resourceType = event.resource.type - const isAsset = resourceType !== 'xhr' && resourceType !== 'fetch' + const resourceType = event.resource.type as RumResourceEventTypeWithWebSocket + const isAsset = resourceType !== 'xhr' && resourceType !== 'fetch' && resourceType !== 'websocket' if (isAsset) { return ( @@ -450,7 +454,8 @@ function ResourceDescription({ event }: { event: RumResourceEvent }) { return ( <> - {RESOURCE_TYPE_LABELS[resourceType]} request {event.resource.url} + {`${RESOURCE_TYPE_LABELS[resourceType]} ${resourceType === 'websocket' ? 'connection' : 'request'} `} + {event.resource.url} ) } diff --git a/packages/browser-core/src/browser/addEventListener.ts b/packages/browser-core/src/browser/addEventListener.ts index 1fca98d267..c2e01df885 100644 --- a/packages/browser-core/src/browser/addEventListener.ts +++ b/packages/browser-core/src/browser/addEventListener.ts @@ -79,7 +79,9 @@ type EventMapFor = T extends Window ? WorkerEventMap : T extends CookieStore ? CookieStoreEventMap - : Record + : T extends WebSocket + ? WebSocketEventMap + : Record /** * Add an event listener to an event target object (Window, Element, mock object...). This provides diff --git a/packages/browser-core/src/browser/webSocketObservable.spec.ts b/packages/browser-core/src/browser/webSocketObservable.spec.ts new file mode 100644 index 0000000000..cb8989444c --- /dev/null +++ b/packages/browser-core/src/browser/webSocketObservable.spec.ts @@ -0,0 +1,327 @@ +import { registerCleanupTask } from '../../test' +import type { Subscription } from '../tools/observable' +import { setAllowUntrustedEvents } from './addEventListener' +import type { WebSocketContext } from './webSocketObservable' +import { initWebSocketObservable, resetWebSocketObservable } from './webSocketObservable' + +// A minimal stand-in for the native `WebSocket` constructor. We do not connect to a real server in +// unit tests; instead we expose helpers to simulate the browser dispatching events on the instance. +class FakeWebSocket extends EventTarget { + static readonly CONNECTING = 0 + static readonly OPEN = 1 + static readonly CLOSING = 2 + static readonly CLOSED = 3 + + url: string + protocol = '' + bufferedAmount = 0 + readyState: number = FakeWebSocket.CONNECTING + onmessage: ((event: MessageEvent) => void) | null = null + onopen: ((event: Event) => void) | null = null + onclose: ((event: CloseEvent) => void) | null = null + + constructor(url: string | URL, protocols?: string | string[]) { + super() + this.url = String(url) + if (typeof protocols === 'string') { + this.protocol = protocols + } + } + + send(_data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + // no-op; tests will set `bufferedAmount` before calling send to verify it is sampled. + } + + close(_code?: number, _reason?: string): void { + this.readyState = FakeWebSocket.CLOSED + } + + simulateOpen() { + this.readyState = FakeWebSocket.OPEN + const event = new Event('open') + this.dispatchEvent(event) + this.onopen?.(event) + } + + simulateMessage(data: unknown) { + const event = new MessageEvent('message', { data }) + this.dispatchEvent(event) + this.onmessage?.(event) + } + + simulateClose(code: number, reason: string, wasClean: boolean) { + this.readyState = FakeWebSocket.CLOSED + // CloseEvent is not always constructable in test environments; use a plain Event with assigned fields. + const event = Object.assign(new Event('close'), { code, reason, wasClean }) as CloseEvent + this.dispatchEvent(event) + this.onclose?.(event) + } +} + +type FakeWebSocketConstructor = typeof FakeWebSocket + +const windowAsWebSocketHost = window as unknown as { WebSocket: FakeWebSocketConstructor } + +describe('webSocketObservable', () => { + let originalWebSocket: FakeWebSocketConstructor + let contexts: WebSocketContext[] + let subscription: Subscription | undefined + + beforeEach(() => { + originalWebSocket = windowAsWebSocketHost.WebSocket + windowAsWebSocketHost.WebSocket = FakeWebSocket + contexts = [] + + registerCleanupTask(() => { + subscription?.unsubscribe() + subscription = undefined + resetWebSocketObservable() + windowAsWebSocketHost.WebSocket = originalWebSocket + }) + }) + + function startTracking() { + setAllowUntrustedEvents(true) + subscription = initWebSocketObservable().subscribe((context) => { + contexts.push(context) + }) + } + + function getContexts(state: T) { + return contexts.filter((context): context is Extract => context.state === state) + } + + describe('when tracking is started', () => { + beforeEach(() => { + startTracking() + }) + + describe('connecting context', () => { + it('emits a "connecting" context when a WebSocket is constructed', () => { + const url = 'wss://example.com/socket' + const ws = new windowAsWebSocketHost.WebSocket(url) + + const connectingContexts = getContexts('connecting') + expect(connectingContexts.length).toBe(1) + expect(connectingContexts[0].url).toBe(url) + expect(connectingContexts[0].instance).toBe(ws as unknown as WebSocket) + expect(connectingContexts[0].startClocks.timeStamp).toEqual(jasmine.any(Number)) + }) + + it('coerces URL objects to strings in the "connecting" context', () => { + const url = 'wss://example.com/socket' + new windowAsWebSocketHost.WebSocket(new URL(url)) + + expect(getContexts('connecting')[0].url).toBe(url) + }) + + it('does not include protocols in the "connecting" context when omitted', () => { + new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + + expect(getContexts('connecting')[0].protocols).toBeUndefined() + }) + + it('includes string protocols in the "connecting" context', () => { + const url = 'wss://example.com/socket' + const protocols = 'chat.v1' + new windowAsWebSocketHost.WebSocket(url, protocols) + + expect(getContexts('connecting')[0].protocols).toBe(protocols) + }) + + it('includes array protocols in the "connecting" context', () => { + const url = 'wss://example.com/socket' + const protocols = ['chat.v1', 'json'] + new windowAsWebSocketHost.WebSocket(url, protocols) + + expect(getContexts('connecting')[0].protocols).toEqual(protocols) + }) + }) + + describe('preservation of native behavior', () => { + it('does not clobber a customer-set onmessage handler', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const customerHandler = jasmine.createSpy() + ws.onmessage = customerHandler + + ws.simulateMessage('hello') + + expect(customerHandler).toHaveBeenCalledTimes(1) + expect(getContexts('message-in').length).toBe(1) + }) + + it('does not clobber a customer-set onopen handler', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const customerHandler = jasmine.createSpy() + ws.onopen = customerHandler + + ws.simulateOpen() + + expect(customerHandler).toHaveBeenCalledTimes(1) + expect(getContexts('open').length).toBe(1) + }) + + it('does not clobber a customer-set onclose handler', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const customerHandler = jasmine.createSpy() + ws.onclose = customerHandler + + ws.simulateClose(1000, 'bye', true) + + expect(customerHandler).toHaveBeenCalledTimes(1) + expect(getContexts('closed').length).toBe(1) + }) + }) + + describe('open context', () => { + it('emits an "open" context when the WebSocket opens', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const negotiatedProtocol = 'chat.v1' + ws.protocol = negotiatedProtocol + ws.simulateOpen() + + const openContexts = getContexts('open') + expect(openContexts.length).toBe(1) + expect(openContexts[0].protocol).toBe(negotiatedProtocol) + expect(openContexts[0].instance).toBe(ws as unknown as WebSocket) + expect(openContexts[0].openClocks.timeStamp).toEqual(jasmine.any(Number)) + }) + + it('emits an "open" context with empty protocol when no sub-protocol negotiated', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + + const openContexts = getContexts('open') + expect(openContexts.length).toBe(1) + expect(openContexts[0].protocol).toBe('') + }) + }) + + describe('message-in context', () => { + it('emits "message-in" with byte-length size for string payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const payload = 'hello world' + ws.simulateMessage(payload) + + const messageInContexts = getContexts('message-in') + expect(messageInContexts.length).toBe(1) + expect(messageInContexts[0].size).toBe(payload.length) + }) + + it('emits "message-in" with UTF-8 byte length for multi-byte strings', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + // 'é' is 2 bytes in UTF-8 and 'あ' is 3 bytes; total is 5 bytes for 2 chars + const payload = 'éあ' + ws.simulateMessage(payload) + + expect(getContexts('message-in')[0].size).toBe(new TextEncoder().encode(payload).byteLength) + }) + + it('emits "message-in" with byteLength for ArrayBuffer payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const byteLength = 16 + ws.simulateMessage(new ArrayBuffer(byteLength)) + + expect(getContexts('message-in')[0].size).toBe(byteLength) + }) + + it('emits "message-in" with byteLength for ArrayBufferView payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const viewByteLength = 12 + ws.simulateMessage(new Uint8Array(new ArrayBuffer(32), 4, viewByteLength)) + + expect(getContexts('message-in')[0].size).toBe(viewByteLength) + }) + + it('emits "message-in" with size for Blob payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + const blob = new Blob(['hello']) + ws.simulateMessage(blob) + + expect(getContexts('message-in')[0].size).toBe(blob.size) + }) + }) + + describe('message-out context', () => { + it('emits "message-out" with size and bufferedAmountPreSend for string payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const bufferedAmountPreSend = 42 + ws.bufferedAmount = bufferedAmountPreSend + const payload = 'hello' + ws.send(payload) + + const messageOutContexts = getContexts('message-out') + expect(messageOutContexts.length).toBe(1) + expect(messageOutContexts[0].size).toBe(payload.length) + expect(messageOutContexts[0].bufferedAmountPreSend).toBe(bufferedAmountPreSend) + expect(messageOutContexts[0].at.timeStamp).toEqual(jasmine.any(Number)) + }) + + it('emits "message-out" with byteLength for ArrayBuffer payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const byteLength = 8 + ws.send(new ArrayBuffer(byteLength)) + + expect(getContexts('message-out')[0].size).toBe(byteLength) + }) + + it('emits "message-out" with byteLength for ArrayBufferView payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const viewByteLength = 10 + ws.send(new Uint8Array(new ArrayBuffer(20), 2, viewByteLength)) + + expect(getContexts('message-out')[0].size).toBe(viewByteLength) + }) + + it('emits "message-out" with size for Blob payloads', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const blob = new Blob(['hello world']) + ws.send(blob) + + expect(getContexts('message-out')[0].size).toBe(blob.size) + }) + }) + + describe('closed context', () => { + it('emits a "closed" context with code, reason, and wasClean', () => { + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + const closeCode = 1000 + const closeReason = 'bye' + const wasClean = true + ws.simulateClose(closeCode, closeReason, wasClean) + + const closeContexts = getContexts('closed') + expect(closeContexts.length).toBe(1) + expect(closeContexts[0].code).toBe(closeCode) + expect(closeContexts[0].reason).toBe(closeReason) + expect(closeContexts[0].wasClean).toBe(wasClean) + expect(closeContexts[0].at.timeStamp).toEqual(jasmine.any(Number)) + }) + }) + + describe('subscription lifecycle', () => { + it('restores the native WebSocket constructor when all subscribers unsubscribe', () => { + subscription?.unsubscribe() + subscription = undefined + + expect(windowAsWebSocketHost.WebSocket).toBe(FakeWebSocket) + }) + + it('does not emit any further events after all subscribers unsubscribe', () => { + subscription?.unsubscribe() + subscription = undefined + + const ws = new windowAsWebSocketHost.WebSocket('wss://example.com/socket') + ws.simulateOpen() + ws.send('hello') + + expect(contexts.length).toBe(0) + }) + }) + }) +}) diff --git a/packages/browser-core/src/browser/webSocketObservable.ts b/packages/browser-core/src/browser/webSocketObservable.ts new file mode 100644 index 0000000000..4b07b96942 --- /dev/null +++ b/packages/browser-core/src/browser/webSocketObservable.ts @@ -0,0 +1,182 @@ +import type { ClocksState } from '@datadog/js-core/time' +import { clocksNow } from '@datadog/js-core/time' +import type { GlobalObject } from '../tools/globalObject' +import { globalObject } from '../tools/globalObject' +import { instrumentConstructor, instrumentMethod } from '../tools/instrumentMethod' +import { Observable } from '../tools/observable' +import { computeBytesCount } from '../tools/utils/byteUtils' +import { addEventListener } from './addEventListener' + +type GlobalWithWebSocket = GlobalObject & { WebSocket: typeof WebSocket } + +function isGlobalWithWebSocket(global: GlobalObject): global is GlobalWithWebSocket { + return typeof (global as { WebSocket?: unknown }).WebSocket === 'function' +} + +export interface WebSocketConnectingContext { + state: 'connecting' + instance: WebSocket + url: string + protocols?: string | string[] + startClocks: ClocksState +} + +export interface WebSocketOpenContext { + state: 'open' + instance: WebSocket + openClocks: ClocksState + protocol: string +} + +export interface WebSocketMessageInContext { + state: 'message-in' + instance: WebSocket + size: number + at: ClocksState +} + +export interface WebSocketMessageOutContext { + state: 'message-out' + instance: WebSocket + size: number + bufferedAmountPreSend: number + at: ClocksState +} + +export interface WebSocketClosedContext { + state: 'closed' + instance: WebSocket + code: number + reason: string + wasClean: boolean + at: ClocksState +} + +export type WebSocketContext = + | WebSocketConnectingContext + | WebSocketOpenContext + | WebSocketMessageInContext + | WebSocketMessageOutContext + | WebSocketClosedContext + +let webSocketObservable: Observable | undefined + +export function initWebSocketObservable(): Observable { + if (!webSocketObservable) { + webSocketObservable = createWebSocketObservable() + } + + return webSocketObservable +} + +function createWebSocketObservable() { + return new Observable((observable) => { + if (!isGlobalWithWebSocket(globalObject)) { + return undefined + } + + const { stop: stopInstrumentingConstructor } = instrumentConstructor( + globalObject, + 'WebSocket', + ({ parameters, onPostCall }) => { + const url = String(parameters[0]) + const protocols = Array.isArray(parameters[1]) ? ([] as string[]).concat(parameters[1]) : parameters[1] + const startClocks = clocksNow() + + onPostCall((instance) => { + observable.notify({ + state: 'connecting', + instance, + url, + protocols, + startClocks, + }) + + attachInstanceListeners(instance, observable) + }) + } + ) + + const { stop: stopInstrumentingSend } = instrumentMethod( + globalObject.WebSocket.prototype, + 'send', + ({ target: instance, parameters: [data], onPostCall }) => { + const size = computePayloadSize(data) + const bufferedAmountPreSend = instance.bufferedAmount + + onPostCall(() => { + observable.notify({ + state: 'message-out', + instance, + size, + bufferedAmountPreSend, + at: clocksNow(), + }) + }) + } + ) + + return () => { + stopInstrumentingConstructor() + stopInstrumentingSend() + } + }) +} + +function attachInstanceListeners(instance: WebSocket, observable: Observable) { + const { stop: stopOpen } = addEventListener(instance, 'open', () => { + observable.notify({ + state: 'open', + instance, + openClocks: clocksNow(), + protocol: instance.protocol || '', + }) + + stopOpen() + }) + + const { stop: stopMessage } = addEventListener(instance, 'message', (event) => { + observable.notify({ + state: 'message-in', + instance, + size: computePayloadSize(event.data), + at: clocksNow(), + }) + }) + + const { stop: stopClose } = addEventListener(instance, 'close', (event) => { + observable.notify({ + state: 'closed', + instance, + code: event.code, + reason: event.reason, + wasClean: event.wasClean, + at: clocksNow(), + }) + + stopMessage() + stopClose() + }) +} + +function computePayloadSize(data: unknown): number { + if (typeof data === 'string') { + return computeBytesCount(data) + } + if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return data.byteLength + } + if (typeof Blob !== 'undefined' && data instanceof Blob) { + return data.size + } + return 0 +} + +/** + * Reset the WebSocket observable global state. Test-only. + * + * @internal + */ +export function resetWebSocketObservable() { + webSocketObservable = undefined +} diff --git a/packages/browser-core/src/domain/resourceUtils.ts b/packages/browser-core/src/domain/resourceUtils.ts index f5849134c0..1d79857a1b 100644 --- a/packages/browser-core/src/domain/resourceUtils.ts +++ b/packages/browser-core/src/domain/resourceUtils.ts @@ -8,6 +8,7 @@ export const ResourceType = { IMAGE: 'image', FONT: 'font', MEDIA: 'media', + WEBSOCKET: 'websocket', OTHER: 'other', } as const diff --git a/packages/browser-core/src/index.ts b/packages/browser-core/src/index.ts index 2f832946cc..914ce52f31 100644 --- a/packages/browser-core/src/index.ts +++ b/packages/browser-core/src/index.ts @@ -126,6 +126,15 @@ export type { XhrCompleteContext, XhrStartContext, XhrContext } from './browser/ export { initXhrObservable } from './browser/xhrObservable' export type { FetchResolveContext, FetchStartContext, FetchContext } from './browser/fetchObservable' export { initFetchObservable, ResponseBodyAction } from './browser/fetchObservable' +export type { + WebSocketContext, + WebSocketConnectingContext, + WebSocketOpenContext, + WebSocketMessageInContext, + WebSocketMessageOutContext, + WebSocketClosedContext, +} from './browser/webSocketObservable' +export { initWebSocketObservable } from './browser/webSocketObservable' export { fetch } from './browser/fetch' export type { PageMayExitEvent } from './browser/pageMayExitObservable' export { createPageMayExitObservable, PageExitReason, isPageExitReason } from './browser/pageMayExitObservable' diff --git a/packages/browser-core/src/tools/experimentalFeatures.ts b/packages/browser-core/src/tools/experimentalFeatures.ts index 65c39b5ffa..1f988fba6d 100644 --- a/packages/browser-core/src/tools/experimentalFeatures.ts +++ b/packages/browser-core/src/tools/experimentalFeatures.ts @@ -16,6 +16,7 @@ import { objectHasValue } from './utils/objectUtils' export enum ExperimentalFeature { TRACK_INTAKE_REQUESTS = 'track_intake_requests', PARTIAL_VIEW_UPDATES = 'partial_view_updates', + TRACK_WEB_SOCKETS = 'track_web_sockets', } const enabledExperimentalFeatures: Set = new Set() diff --git a/packages/browser-core/test/forEach.spec.ts b/packages/browser-core/test/forEach.spec.ts index f95041eb14..1c393d7960 100644 --- a/packages/browser-core/test/forEach.spec.ts +++ b/packages/browser-core/test/forEach.spec.ts @@ -4,6 +4,7 @@ import { resetValueHistoryGlobals } from '../src/tools/valueHistory' import { resetFetchObservable } from '../src/browser/fetchObservable' import { resetConsoleObservable } from '../src/domain/console/consoleObservable' import { resetXhrObservable } from '../src/browser/xhrObservable' +import { resetWebSocketObservable } from '../src/browser/webSocketObservable' import { resetGetCurrentSite } from '../src/browser/cookie' import { resetReplayStats } from '../../browser-rum/src/domain/replayStats' import { resetInteractionCountPolyfill } from '../../browser-rum-core/src/domain/view/viewMetrics/interactionCountPolyfill' @@ -34,6 +35,7 @@ afterEach(() => { resetFetchObservable() resetConsoleObservable() resetXhrObservable() + resetWebSocketObservable() resetGetCurrentSite() resetReplayStats() resetMonitor() diff --git a/packages/browser-rum-core/src/boot/startRum.ts b/packages/browser-rum-core/src/boot/startRum.ts index 70923695e0..32a324d808 100644 --- a/packages/browser-rum-core/src/boot/startRum.ts +++ b/packages/browser-rum-core/src/boot/startRum.ts @@ -17,6 +17,8 @@ import { startGlobalContext, startUserContext, startTabContext, + isExperimentalFeatureEnabled, + ExperimentalFeature, } from '@datadog/browser-core' import { createDOMMutationObservable } from '../browser/domMutationObservable' import { createWindowOpenObservable } from '../browser/windowOpenObservable' @@ -24,6 +26,7 @@ import { startInternalContext } from '../domain/contexts/internalContext' import { LifeCycle, LifeCycleEventType } from '../domain/lifeCycle' import { startViewHistory } from '../domain/contexts/viewHistory' import { startRequestCollection } from '../domain/requestCollection' +import { startWebSocketCollection } from '../domain/webSocketCollection' import { startActionCollection } from '../domain/action/actionCollection' import { startErrorCollection } from '../domain/error/errorCollection' import { startResourceCollection } from '../domain/resource/resourceCollection' @@ -230,6 +233,11 @@ export function startRumEventCollection( const vitalCollection = startVitalCollection(lifeCycle, pageStateHistory) + if (isExperimentalFeatureEnabled(ExperimentalFeature.TRACK_WEB_SOCKETS)) { + const webSocketCollection = startWebSocketCollection(lifeCycle, viewHistory, vitalCollection.addDurationVital) + cleanupTasks.push(webSocketCollection.stop) + } + const internalContext = startInternalContext( configuration.applicationId, sessionManager, diff --git a/packages/browser-rum-core/src/domain/assembly.ts b/packages/browser-rum-core/src/domain/assembly.ts index a55dfb1c1f..bdf9214245 100644 --- a/packages/browser-rum-core/src/domain/assembly.ts +++ b/packages/browser-rum-core/src/domain/assembly.ts @@ -40,6 +40,7 @@ const MODIFIABLE_FIELD_PATHS_BY_EVENT: Record { + handleResource(() => assembleWebSocketResource(event, configuration)) + }) const performanceResourceSubscription = createPerformanceObservable({ type: RumPerformanceEntryType.RESOURCE, buffered: true, @@ -108,6 +113,56 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum } } +function assembleWebSocketResource( + event: WebSocketCompleteEvent, + configuration: RumConfiguration +): RawRumEventCollectedData | undefined { + const duration = elapsed(event.startClocks.timeStamp, event.endClocks.timeStamp) + + const rawRumEvent: RawRumResourceEvent = { + date: event.startClocks.timeStamp, + type: RumEventType.RESOURCE, + resource: { + id: generateUUID(), + type: ResourceType.WEBSOCKET, + url: event.url, + duration: toServerDuration(duration), + websocket: { + connection_id: event.connectionId, + handshake_succeeded: event.handshakeSucceeded, + start_time: event.startClocks.timeStamp, + end_time: event.endClocks.timeStamp, + start_view_id: event.startViewId, + end_view_id: event.endViewId, + tracking_end_reason: event.trackingEndReason, + close_code: event.closeCode, + close_reason: event.closeReason, + was_clean: event.wasClean, + messages_in: event.messagesIn, + messages_out: event.messagesOut, + time_to_first_message_in: toServerDuration(event.firstMessageInOffset), + time_to_first_message_out: toServerDuration(event.firstMessageOutOffset), + last_message_in_at: event.lastMessageInAt, + longest_inbound_silence: toServerDuration(event.longestInboundSilence), + inbound_idle_duration_before_close: toServerDuration(event.inboundIdleDurationBeforeClose), + buffered_amount_max: event.bufferedAmountMax, + protocol: event.protocol, + setup_duration: toServerDuration(event.setupDuration), + }, + }, + _dd: { + discarded: !configuration.trackResources, + }, + } + + return { + startClocks: event.startClocks, + duration, + rawRumEvent, + domainContext: {}, + } +} + function assembleResource( entry: ResourceLikeEntry, request: RequestCompleteEvent | undefined, diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts new file mode 100644 index 0000000000..4eef64e13e --- /dev/null +++ b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts @@ -0,0 +1,437 @@ +import type { WebSocketContext } from '@datadog/browser-core' +import { initWebSocketObservable, Observable } from '@datadog/browser-core' +import { mockClock, registerCleanupTask, type Clock } from '@datadog/browser-core/test' +import type { ClocksState, Duration, RelativeTime } from '@datadog/js-core/time' +import { elapsed, relativeToClocks } from '@datadog/js-core/time' +import { mockViewHistory } from '../../test' +import { VitalType } from '../rawRumEvent.types' +import type { ViewHistoryEntry } from './contexts/viewHistory' +import { LifeCycle, LifeCycleEventType } from './lifeCycle' +import type { DurationVital } from './vital/vitalCollection' +import type { WebSocketCompleteEvent } from './webSocketCollection' +import { startWebSocketCollection, trackWebSocket, WEBSOCKET_CONNECTING_VITAL_NAME } from './webSocketCollection' + +describe('webSocketCollection', () => { + let lifeCycle: LifeCycle + let wsObservable: Observable + let completed: WebSocketCompleteEvent[] + let wsInstance: WebSocket + let clock: Clock + + beforeEach(() => { + clock = mockClock() + lifeCycle = new LifeCycle() + wsObservable = new Observable() + completed = [] + wsInstance = {} as WebSocket + lifeCycle.subscribe(LifeCycleEventType.WEBSOCKET_COMPLETED, (webSocket) => { + completed.push(webSocket) + }) + }) + + function startTracking( + viewHistory = mockViewHistory(), + addDurationVital: (vital: DurationVital) => void = jasmine.createSpy() + ) { + return trackWebSocket(lifeCycle, wsObservable, viewHistory, addDurationVital) + } + + function notifyConnecting( + startRelative = 0, + url = 'wss://example.com/socket', + startClocks?: ClocksState, + protocols?: string | string[] + ) { + wsObservable.notify({ + state: 'connecting', + instance: wsInstance, + url, + ...(protocols !== undefined ? { protocols } : {}), + startClocks: startClocks ?? relativeToClocks(clock.relative(startRelative)), + }) + } + + function notifyOpen(openRelative = 10, protocol = '', openClocks?: ClocksState) { + wsObservable.notify({ + state: 'open', + instance: wsInstance, + openClocks: openClocks ?? relativeToClocks(clock.relative(openRelative)), + protocol, + }) + } + + function notifyMessageIn(at: number, size: number) { + wsObservable.notify({ state: 'message-in', instance: wsInstance, size, at: relativeToClocks(clock.relative(at)) }) + } + + function notifyMessageOut(at: number, size: number, bufferedAmountPreSend = 0) { + wsObservable.notify({ + state: 'message-out', + instance: wsInstance, + size, + bufferedAmountPreSend, + at: relativeToClocks(clock.relative(at)), + }) + } + + function notifyClosed(at: number, code: number, reason: string, wasClean: boolean, atClocks?: ClocksState) { + wsObservable.notify({ + state: 'closed', + instance: wsInstance, + code, + reason, + wasClean, + at: atClocks ?? relativeToClocks(clock.relative(at)), + }) + } + + describe('handshakeSucceeded', () => { + it('is true when the open event fired before completion', () => { + const tracker = startTracking() + + notifyConnecting() + notifyOpen(10) + notifyClosed(40, 1000, 'bye', true) + expect(completed[0].handshakeSucceeded).toBeTrue() + + notifyConnecting() + notifyOpen(10) + tracker.flushOpenConnections('session_end') + expect(completed[1].handshakeSucceeded).toBeTrue() + }) + + it('is false when the open event never fired before completion', () => { + const tracker = startTracking() + notifyConnecting() + notifyClosed(25, 1006, 'abnormal', false) + expect(completed[0].handshakeSucceeded).toBeFalse() + + notifyConnecting() + tracker.flushOpenConnections('session_end') + expect(completed[1].handshakeSucceeded).toBeFalse() + }) + }) + + it('emits a completed event on close with tracking_end_reason="close_event"', () => { + const url = 'wss://example.com/socket' + const protocol = 'chat.v1' + const messageInSize = 100 + const messageOutSize = 50 + const bufferedAmount = 8 + const closeCode = 1000 + const closeReason = 'bye' + + startTracking() + notifyConnecting(0, url) + notifyOpen(10, protocol) + notifyMessageIn(20, messageInSize) + notifyMessageOut(30, messageOutSize, bufferedAmount) + notifyClosed(40, closeCode, closeReason, true) + + expect(completed.length).toBe(1) + const webSocket = completed[0] + + expect(webSocket.trackingEndReason).toBe('close_event') + expect(webSocket.closeCode).toBe(closeCode) + expect(webSocket.closeReason).toBe(closeReason) + expect(webSocket.wasClean).toBeTrue() + expect(webSocket.url).toBe(url) + expect(webSocket.protocol).toBe(protocol) + expect(webSocket.messagesIn).toEqual({ count: 1, size: messageInSize }) + expect(webSocket.messagesOut).toEqual({ count: 1, size: messageOutSize }) + expect(webSocket.bufferedAmountMax).toBe(bufferedAmount) + }) + + it('generates a unique connection_id per connection', () => { + startTracking() + notifyConnecting() + notifyClosed(1, 1000, 'reason_a', true) + const firstId = completed[0].connectionId + + wsInstance = {} as WebSocket + notifyConnecting() + notifyClosed(1, 1000, 'reason_b', true) + + expect(completed[1].connectionId).not.toBe(firstId) + }) + + it('records firstMessageInOffset / firstMessageOutOffset as offsets from open', () => { + const openAt = 10 + const firstMessageInAt = 13 + const firstMessageOutAt = 17 + + startTracking() + notifyConnecting() + notifyOpen(openAt) + notifyMessageIn(firstMessageInAt, 1) + notifyMessageIn(25, 1) // not first; should not update + notifyMessageOut(firstMessageOutAt, 1) + notifyClosed(30, 1000, 'bye', true) + + const webSocket = completed[0] + expect(webSocket.firstMessageInOffset).toBe((firstMessageInAt - openAt) as Duration) + expect(webSocket.firstMessageOutOffset).toBe((firstMessageOutAt - openAt) as Duration) + }) + + it('tracks longestInboundSilence from consecutive message-in only', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + notifyMessageIn(50, 1) // gap 30 + notifyMessageIn(75, 1) // gap 25 + notifyClosed(100, 1000, 'bye', true) + + expect(completed[0].longestInboundSilence).toBe(30 as Duration) + }) + + it('ignores message-out when computing longestInboundSilence', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + notifyMessageOut(100, 1) + notifyMessageIn(130, 1) // gap from last in (20) to 130 + notifyClosed(200, 1000, 'bye', true) + + expect(completed[0].longestInboundSilence).toBe(110 as Duration) + }) + + it('records inboundIdleDurationBeforeClose from last message-in to close', () => { + const lastMessageInAt = 20 + const closeAt = 50 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(lastMessageInAt, 1) + notifyClosed(closeAt, 1000, 'bye', true) + + expect(completed[0].inboundIdleDurationBeforeClose).toBe((closeAt - lastMessageInAt) as Duration) + }) + + it('leaves inboundIdleDurationBeforeClose and lastMessageInAt undefined when no message was received', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageOut(30, 1) + notifyClosed(50, 1000, 'bye', true) + + expect(completed[0].lastMessageInAt).toBeUndefined() + expect(completed[0].inboundIdleDurationBeforeClose).toBeUndefined() + }) + + it('records setupDuration as elapsed time from connecting to open', () => { + const startAt = 0 + const openAt = 10 + const startClocks = relativeToClocks(clock.relative(startAt)) + const openClocks = relativeToClocks(clock.relative(openAt)) + const expectedSetupDuration = elapsed(startClocks.timeStamp, openClocks.timeStamp) + + startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyOpen(openAt, '', openClocks) + notifyClosed(40, 1000, 'bye', true) + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + it('records setupDuration as elapsed time from connecting to close when open never fires', () => { + const startAt = 0 + const closeAt = 25 + const closeCode = 1006 + const closeReason = 'abnormal' + const startClocks = relativeToClocks(clock.relative(startAt)) + const closeClocks = relativeToClocks(clock.relative(closeAt)) + const expectedSetupDuration = elapsed(startClocks.timeStamp, closeClocks.timeStamp) + + startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyClosed(closeAt, closeCode, closeReason, false, closeClocks) + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + it('records setupDuration on session_end flush when the connection never opened', () => { + const tracker = startTracking() + notifyConnecting() + tracker.flushOpenConnections('session_end') + + const webSocket = completed[0] + expect(webSocket.setupDuration).toBe(elapsed(webSocket.startClocks.timeStamp, webSocket.endClocks.timeStamp)) + }) + + it('collects buffered_amount_max from message-out events', () => { + const peakBufferedAmount = 100 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageOut(20, 1, 10) + notifyMessageOut(30, 1, peakBufferedAmount) + notifyMessageOut(40, 1, 50) + notifyClosed(50, 1000, 'bye', true) + + expect(completed[0].bufferedAmountMax).toBe(peakBufferedAmount) + }) + + it('captures startViewId and endViewId from viewHistory', () => { + const startViewB = 100 + const relativeStartViewA = clock.relative(0) + const relativeStartViewB = clock.relative(startViewB) + const viewByRelative: Record = { + [relativeStartViewA]: { id: 'view-A', startClocks: relativeToClocks(relativeStartViewA) }, + [relativeStartViewB]: { id: 'view-B', startClocks: relativeToClocks(relativeStartViewB) }, + } + const viewHistory = mockViewHistory() + spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => + startTime !== undefined ? viewByRelative[startTime as number] : undefined + ) + + startTracking(viewHistory) + notifyConnecting() + notifyClosed(startViewB, 1000, 'bye', true) + + const webSocket = completed[0] + expect(webSocket.startViewId).toBe('view-A') + expect(webSocket.endViewId).toBe('view-B') + }) + + it('flushOpenConnections finalizes still-open connections with tracking_end_reason="session_end"', () => { + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + + tracker.flushOpenConnections('session_end') + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeTrue() + expect(completed[0].closeCode).toBeUndefined() + expect(completed[0].closeReason).toBeUndefined() + expect(completed[0].wasClean).toBeUndefined() + }) + + it('does not finalize twice when close arrives after flushOpenConnections', () => { + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + tracker.flushOpenConnections('session_end') + notifyClosed(20, 1000, 'bye', true) + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + }) + + it('stop() unsubscribes from the observable and ignores further events', () => { + const tracker = startTracking() + notifyConnecting() + tracker.stop() + notifyClosed(20, 1000, 'bye', true) + + expect(completed.length).toBe(0) + }) + + describe('websocket-connecting vital', () => { + it('emits a duration-0 vital on connecting', () => { + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + startTracking(mockViewHistory(), addDurationVital) + notifyConnecting() + + expect(addDurationVital).toHaveBeenCalledOnceWith( + jasmine.objectContaining({ + name: WEBSOCKET_CONNECTING_VITAL_NAME, + type: VitalType.DURATION, + duration: 0, + }) + ) + }) + + it('uses the same id as the subsequent WEBSOCKET_COMPLETED connectionId', () => { + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + startTracking(mockViewHistory(), addDurationVital) + notifyConnecting() + notifyClosed(1, 1000, 'bye', true) + + expect(addDurationVital).toHaveBeenCalledOnceWith(jasmine.objectContaining({ id: completed[0].connectionId })) + }) + + it('includes url, protocols, and startViewId in the vital context', () => { + const startView = 0 + const relativeStartView = clock.relative(startView) + const viewByRelative: Record = { + [relativeStartView]: { id: 'view-start', startClocks: relativeToClocks(relativeStartView) }, + } + const viewHistory = mockViewHistory() + spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => + startTime !== undefined ? viewByRelative[startTime as number] : undefined + ) + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + const url = 'wss://example.com/socket' + const protocols = ['chat.v1', 'json'] + + startTracking(viewHistory, addDurationVital) + notifyConnecting(startView, url, undefined, protocols) + + expect(addDurationVital).toHaveBeenCalledOnceWith( + jasmine.objectContaining({ + context: { + url, + protocols, + startViewId: 'view-start', + }, + }) + ) + }) + }) + + describe('startWebSocketCollection', () => { + const wsInstance = {} as WebSocket + const wsUrl = 'wss://example.com/socket' + + function notifyConnectionConnecting(offsetMs = 0) { + // initWebSocketObservable returns a singleton, it will notify the same instance for all calls + initWebSocketObservable().notify({ + state: 'connecting', + instance: wsInstance, + url: wsUrl, + startClocks: relativeToClocks(clock.relative(offsetMs)), + }) + } + + it('finalizes open connections with tracking_end_reason="session_end" when the session expires', () => { + const collection = startWebSocketCollection(lifeCycle, mockViewHistory(), jasmine.createSpy()) + registerCleanupTask(() => collection.stop()) + notifyConnectionConnecting() + + lifeCycle.notify(LifeCycleEventType.SESSION_EXPIRED) + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeFalse() + expect(completed[0].closeCode).toBeUndefined() + expect(completed[0].closeReason).toBeUndefined() + expect(completed[0].wasClean).toBeUndefined() + }) + + it('ignores further WebSocket events from the same instance after stop()', () => { + const collection = startWebSocketCollection(lifeCycle, mockViewHistory(), jasmine.createSpy()) + notifyConnectionConnecting() + collection.stop() + + const eventCountAfterStop = completed.length + + initWebSocketObservable().notify({ + state: 'closed', + instance: wsInstance, + code: 1000, + reason: 'bye', + wasClean: true, + at: relativeToClocks(clock.relative(1000)), + }) + + expect(completed.length).toBe(eventCountAfterStop) + }) + }) +}) diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.ts b/packages/browser-rum-core/src/domain/webSocketCollection.ts new file mode 100644 index 0000000000..994d172eab --- /dev/null +++ b/packages/browser-rum-core/src/domain/webSocketCollection.ts @@ -0,0 +1,251 @@ +import type { Observable, WebSocketContext } from '@datadog/browser-core' +import { generateUUID, initWebSocketObservable, sanitize } from '@datadog/browser-core' +import type { ClocksState, Duration, TimeStamp } from '@datadog/js-core/time' +import { clocksNow, elapsed } from '@datadog/js-core/time' +import { VitalType } from '../rawRumEvent.types' +import type { ViewHistory } from './contexts/viewHistory' +import type { LifeCycle } from './lifeCycle' +import { LifeCycleEventType } from './lifeCycle' +import type { DurationVital } from './vital/vitalCollection' + +export const WEBSOCKET_CONNECTING_VITAL_NAME = 'websocket-connecting' + +export type WebSocketTrackingEndReason = 'close_event' | 'session_end' + +export interface WebSocketCompleteEvent { + connectionId: string + url: string + protocol?: string + startClocks: ClocksState + endClocks: ClocksState + startViewId?: string + endViewId?: string + messagesIn: { count: number; size: number } + messagesOut: { count: number; size: number } + firstMessageInOffset?: Duration + firstMessageOutOffset?: Duration + lastMessageInAt?: TimeStamp + longestInboundSilence: Duration + bufferedAmountMax: number + inboundIdleDurationBeforeClose?: Duration + closeCode?: number + closeReason?: string + wasClean?: boolean + handshakeSucceeded: boolean + trackingEndReason: WebSocketTrackingEndReason + setupDuration: Duration +} + +interface WebSocketConnection { + connectionId: string + url: string + protocol?: string + startClocks: ClocksState + openClocks?: ClocksState + startViewId?: string + messagesIn: { count: number; size: number } + messagesOut: { count: number; size: number } + firstMessageInOffset?: Duration + firstMessageOutOffset?: Duration + lastMessageInAt?: TimeStamp + longestInboundSilence: Duration + bufferedAmountMax: number + setupDuration?: Duration +} + +export interface WebSocketConnectionTracker { + flushOpenConnections: (reason: WebSocketTrackingEndReason) => void + stop: () => void +} + +export function startWebSocketCollection( + lifeCycle: LifeCycle, + viewHistory: ViewHistory, + addDurationVital: (vital: DurationVital) => void +) { + const tracker = trackWebSocket(lifeCycle, initWebSocketObservable(), viewHistory, addDurationVital) + + const sessionExpiredSubscription = lifeCycle.subscribe(LifeCycleEventType.SESSION_EXPIRED, () => { + tracker.flushOpenConnections('session_end') + }) + + return { + stop: () => { + sessionExpiredSubscription.unsubscribe() + tracker.flushOpenConnections('session_end') + tracker.stop() + }, + } +} + +export function trackWebSocket( + lifeCycle: LifeCycle, + webSocketContextObservable: Observable, + viewHistory: ViewHistory, + addDurationVital: (vital: DurationVital) => void +): WebSocketConnectionTracker { + const webSocketRegistry = new Map() + + const subscription = webSocketContextObservable.subscribe((context) => { + switch (context.state) { + case 'connecting': { + const connectionId = generateUUID() + const startViewId = viewHistory.findView(context.startClocks.relative)?.id + const webSocket: WebSocketConnection = { + connectionId, + url: context.url, + startClocks: context.startClocks, + startViewId, + messagesIn: { count: 0, size: 0 }, + messagesOut: { count: 0, size: 0 }, + longestInboundSilence: 0 as Duration, + bufferedAmountMax: 0, + } + webSocketRegistry.set(context.instance, webSocket) + + addDurationVital({ + id: connectionId, + name: WEBSOCKET_CONNECTING_VITAL_NAME, + type: VitalType.DURATION, + startClocks: context.startClocks, + duration: 0 as Duration, + context: sanitize({ + url: context.url, + protocols: context.protocols, + startViewId, + }), + }) + return + } + + case 'open': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + webSocket.openClocks = context.openClocks + webSocket.protocol = context.protocol + webSocket.setupDuration = elapsed(webSocket.startClocks.timeStamp, context.openClocks.timeStamp) + + return + } + + case 'message-in': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + webSocket.messagesIn.count += 1 + webSocket.messagesIn.size += context.size + recordMessageTiming(webSocket, context.at, 'in') + + return + } + + case 'message-out': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + if (context.bufferedAmountPreSend > webSocket.bufferedAmountMax) { + webSocket.bufferedAmountMax = context.bufferedAmountPreSend + } + webSocket.messagesOut.count += 1 + webSocket.messagesOut.size += context.size + recordMessageTiming(webSocket, context.at, 'out') + + return + } + + case 'closed': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + + webSocketRegistry.delete(context.instance) + + lifeCycle.notify( + LifeCycleEventType.WEBSOCKET_COMPLETED, + buildCompletedEvent(webSocket, context, 'close_event', viewHistory) + ) + + return + } + } + }) + + return { + flushOpenConnections: (reason) => { + const at = clocksNow() + + webSocketRegistry.forEach((webSocket) => { + lifeCycle.notify( + LifeCycleEventType.WEBSOCKET_COMPLETED, + buildCompletedEvent(webSocket, { at }, reason, viewHistory) + ) + }) + + webSocketRegistry.clear() + }, + stop: () => { + subscription.unsubscribe() + webSocketRegistry.clear() + }, + } +} + +function recordMessageTiming(webSocket: WebSocketConnection, at: ClocksState, direction: 'in' | 'out') { + const handshakeFailed = webSocket.openClocks === undefined + + if (handshakeFailed) { + return + } + + const offset = elapsed(webSocket.openClocks!.timeStamp, at.timeStamp) + if (direction === 'in' && webSocket.firstMessageInOffset === undefined) { + webSocket.firstMessageInOffset = offset + } else if (direction === 'out' && webSocket.firstMessageOutOffset === undefined) { + webSocket.firstMessageOutOffset = offset + } + + if (direction === 'in') { + if (webSocket.lastMessageInAt !== undefined) { + const gap = elapsed(webSocket.lastMessageInAt, at.timeStamp) + if (gap > webSocket.longestInboundSilence) { + webSocket.longestInboundSilence = gap + } + } + webSocket.lastMessageInAt = at.timeStamp + } +} + +function buildCompletedEvent( + webSocket: WebSocketConnection, + endInfo: { at: ClocksState; code?: number; reason?: string; wasClean?: boolean }, + trackingEndReason: WebSocketTrackingEndReason, + viewHistory: ViewHistory +): WebSocketCompleteEvent { + const endClocks = endInfo.at + const endViewId = viewHistory.findView(endClocks.relative)?.id + const inboundIdleDurationBeforeClose = + webSocket.lastMessageInAt !== undefined ? elapsed(webSocket.lastMessageInAt, endClocks.timeStamp) : undefined + + const { openClocks, ...rest } = webSocket + + return { + ...rest, + endClocks, + endViewId, + inboundIdleDurationBeforeClose, + trackingEndReason, + handshakeSucceeded: openClocks !== undefined, + setupDuration: webSocket.setupDuration ?? elapsed(webSocket.startClocks.timeStamp, endClocks.timeStamp), + closeCode: endInfo.code, + closeReason: endInfo.reason, + wasClean: endInfo.wasClean, + } +} diff --git a/packages/browser-rum-core/src/domainContext.types.ts b/packages/browser-rum-core/src/domainContext.types.ts index 9b18e87a47..5d81c93d74 100644 --- a/packages/browser-rum-core/src/domainContext.types.ts +++ b/packages/browser-rum-core/src/domainContext.types.ts @@ -9,7 +9,7 @@ export type RumEventDomainContext = T extends type : T extends typeof RumEventType.ACTION ? RumActionEventDomainContext : T extends typeof RumEventType.RESOURCE - ? RumResourceEventDomainContext | RumManualResourceEventDomainContext + ? RumResourceEventDomainContext | RumManualResourceEventDomainContext | RumWebSocketResourceEventDomainContext : T extends typeof RumEventType.ERROR ? RumErrorEventDomainContext : T extends typeof RumEventType.LONG_TASK @@ -48,6 +48,8 @@ export interface RumManualResourceEventDomainContext { isManual: true } +export type RumWebSocketResourceEventDomainContext = Record + export interface RumErrorEventDomainContext { error: unknown handlingStack?: string diff --git a/packages/browser-rum-core/src/rawRumEvent.types.ts b/packages/browser-rum-core/src/rawRumEvent.types.ts index a115388795..45c0871263 100644 --- a/packages/browser-rum-core/src/rawRumEvent.types.ts +++ b/packages/browser-rum-core/src/rawRumEvent.types.ts @@ -73,6 +73,7 @@ export interface RawRumResourceEvent { protocol?: string delivery_type?: DeliveryType graphql?: GraphQlMetadata + websocket?: WebSocketResourceProperties request?: ResourceRequest response?: ResourceResponse } @@ -86,6 +87,29 @@ export interface RawRumResourceEvent { context?: Context } +export interface WebSocketResourceProperties { + connection_id: string + handshake_succeeded: boolean + start_time: TimeStamp + end_time: TimeStamp + start_view_id?: string + end_view_id?: string + tracking_end_reason: 'close_event' | 'session_end' + close_code?: number + close_reason?: string + was_clean?: boolean + messages_in: { count: number; size: number } + messages_out: { count: number; size: number } + time_to_first_message_in?: ServerDuration + time_to_first_message_out?: ServerDuration + last_message_in_at?: TimeStamp + longest_inbound_silence: ServerDuration + inbound_idle_duration_before_close?: ServerDuration + buffered_amount_max: number + protocol?: string + setup_duration: ServerDuration +} + export type NetworkHeaders = Record export interface ResourceRequest { diff --git a/packages/browser-rum-core/test/formatValidation.ts b/packages/browser-rum-core/test/formatValidation.ts index 8387ee05d0..a5b7369b83 100644 --- a/packages/browser-rum-core/test/formatValidation.ts +++ b/packages/browser-rum-core/test/formatValidation.ts @@ -1,11 +1,12 @@ -import Ajv from 'ajv' -import { registerCleanupTask } from '@datadog/browser-core/test' import type { Context } from '@datadog/browser-core' +import { ResourceType } from '@datadog/browser-core' +import { registerCleanupTask } from '@datadog/browser-core/test' import { combine } from '@datadog/js-core/util' import type { CommonProperties } from '@datadog/browser-rum-core' +import Ajv from 'ajv' import type { LifeCycle, RawRumEventCollectedData } from '../src/domain/lifeCycle' import { LifeCycleEventType } from '../src/domain/lifeCycle' -import type { RawRumEvent } from '../src/rawRumEvent.types' +import { RumEventType, type RawRumEvent } from '../src/rawRumEvent.types' import { allJsonSchemas } from './allJsonSchemas' export function collectAndValidateRawRumEvents(lifeCycle: LifeCycle) { @@ -22,6 +23,11 @@ export function collectAndValidateRawRumEvents(lifeCycle: LifeCycle) { } function validateRumEventFormat(rawRumEvent: RawRumEvent) { + // TODO: Remove this once we have a proper websocket schema + if (rawRumEvent.type === RumEventType.RESOURCE && rawRumEvent.resource.type === ResourceType.WEBSOCKET) { + return + } + const fakeId = '00000000-aaaa-0000-aaaa-000000000000' const fakeContext: Partial = { _dd: { diff --git a/scripts/dev-server/lib/server.ts b/scripts/dev-server/lib/server.ts index c045a925a8..d19330af71 100644 --- a/scripts/dev-server/lib/server.ts +++ b/scripts/dev-server/lib/server.ts @@ -31,6 +31,7 @@ export function runServer({ writeIntakeFile = true }: { writeIntakeFile?: boolea app.use((_req, res, next) => { res.setHeader('Document-Policy', 'js-profiling') + res.setHeader('Access-Control-Allow-Origin', '*') next() }) diff --git a/test/e2e/lib/framework/httpServers.ts b/test/e2e/lib/framework/httpServers.ts index 8049643be4..5379bd04e8 100644 --- a/test/e2e/lib/framework/httpServers.ts +++ b/test/e2e/lib/framework/httpServers.ts @@ -1,5 +1,6 @@ import * as http from 'http' import type { AddressInfo } from 'net' +import type { Duplex } from 'stream' import { test } from '@playwright/test' import type { Browser } from '@playwright/test' import { getIp } from '../../../envUtils' @@ -10,10 +11,14 @@ const MAX_SERVER_CREATION_RETRY = 5 const PORT_MIN = 9200 const PORT_MAX = 9400 -export type ServerApp = (req: http.IncomingMessage, res: http.ServerResponse) => any +export interface ServerApp { + (req: http.IncomingMessage, res: http.ServerResponse): any + handleUpgrade?: (req: http.IncomingMessage, socket: Duplex, head: Buffer) => void +} export type MockServerApp = ServerApp & { getLargeResponseWroteSize(): number + closeEchoWebSockets?: () => void } export interface Server { @@ -69,6 +74,14 @@ async function createServer(): Promise> { } }) + server.on('upgrade', (req: http.IncomingMessage, socket: Duplex, head: Buffer) => { + if (serverApp?.handleUpgrade) { + serverApp.handleUpgrade(req, socket, head) + } else { + socket.destroy() + } + }) + return { bindServerApp(newServerApp: App) { serverApp = newServerApp diff --git a/test/e2e/lib/framework/serverApps/mock.ts b/test/e2e/lib/framework/serverApps/mock.ts index 148f813600..461c592443 100644 --- a/test/e2e/lib/framework/serverApps/mock.ts +++ b/test/e2e/lib/framework/serverApps/mock.ts @@ -1,5 +1,8 @@ -import type { ServerResponse } from 'http' +import type { IncomingMessage, ServerResponse } from 'http' +import type { Duplex } from 'stream' import * as url from 'url' +import type { WebSocket } from 'ws' +import { WebSocketServer } from 'ws' import cors from 'cors' import qs from 'qs' import express from 'express' @@ -8,6 +11,7 @@ import type { MockServerApp, Servers } from '../httpServers' import { DEV_SERVER_BASE_URL } from '../../helpers/playwright' import type { SetupOptions } from '../pageSetups' import { workerSetup } from '../pageSetups' +import { rawDataToString } from '../../helpers/rawDataToString' export const LARGE_RESPONSE_MIN_BYTE_SIZE = 100_000 @@ -16,6 +20,8 @@ export function createMockServerApp(servers: Servers, setup: string, setupOption const app = express() let largeResponseBytesWritten = 0 + const { webSocketUrl, handleUpgrade, closeEchoWebSockets } = setupEchoWebSockets(servers.base.origin) + app.use(cors()) app.disable('etag') // disable automatic resource caching @@ -173,7 +179,7 @@ export function createMockServerApp(servers: Servers, setup: string, setupOption res.header( 'Content-Security-Policy', [ - `connect-src ${servers.datadogHttpApi.origin} ${servers.base.origin} ${servers.crossOrigin.origin} https://quota.browser-intake-datadoghq.com`, + `connect-src ${servers.datadogHttpApi.origin} ${servers.base.origin} ${webSocketUrl} ${servers.crossOrigin.origin} https://quota.browser-intake-datadoghq.com`, `script-src 'self' 'unsafe-inline' ${servers.crossOrigin.origin}`, "worker-src blob: 'self'", ].join(';') @@ -235,6 +241,8 @@ export function createMockServerApp(servers: Servers, setup: string, setupOption getLargeResponseWroteSize() { return largeResponseBytesWritten }, + handleUpgrade, + closeEchoWebSockets, }) } @@ -261,3 +269,44 @@ function forwardToDevServer(originalUrl: string, res: ServerResponse) { }) .catch(() => console.error(`Error fetching ${url}, did you run 'yarn dev'?`)) } + +function setupEchoWebSockets(httpOrigin: string) { + const webSocketUrl = httpOriginToWebsocketUrl(httpOrigin) + + const echoWebSockets = new Set() + const wss = new WebSocketServer({ noServer: true }) + wss.on('connection', (ws) => { + echoWebSockets.add(ws) + ws.on('close', () => { + echoWebSockets.delete(ws) + }) + ws.on('message', (data) => { + ws.send(`echo: ${rawDataToString(data)}`) + }) + }) + + function httpOriginToWebsocketUrl(httpOrigin: string) { + const parsed = new URL(httpOrigin) + const wsScheme = parsed.protocol === 'https:' ? 'wss:' : 'ws:' + return `${wsScheme}//${parsed.host}` + } + + function handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer) { + const pathname = new URL(req.url || '/', 'http://localhost').pathname + if (pathname === '/ws-echo') { + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req) + }) + } else { + socket.destroy() + } + } + + function closeEchoWebSockets() { + for (const client of echoWebSockets) { + client.close() + } + } + + return { webSocketUrl, handleUpgrade, closeEchoWebSockets } +} diff --git a/test/e2e/lib/helpers/rawDataToString.ts b/test/e2e/lib/helpers/rawDataToString.ts new file mode 100644 index 0000000000..56fdd45ce0 --- /dev/null +++ b/test/e2e/lib/helpers/rawDataToString.ts @@ -0,0 +1,12 @@ +import { Buffer } from 'node:buffer' +import type { RawData } from 'ws' + +export function rawDataToString(data: RawData): string { + if (Array.isArray(data)) { + return Buffer.concat(data).toString('utf8') + } + if (Buffer.isBuffer(data)) { + return data.toString('utf8') + } + return Buffer.from(data).toString('utf8') +} diff --git a/test/e2e/lib/helpers/validation.ts b/test/e2e/lib/helpers/validation.ts index b25ba843f8..f1e7ed8c0b 100644 --- a/test/e2e/lib/helpers/validation.ts +++ b/test/e2e/lib/helpers/validation.ts @@ -1,10 +1,13 @@ import fs from 'fs' import * as path from 'node:path' -import type { RumEvent } from '@datadog/browser-rum' +import type { RumEvent, RumResourceEvent } from '@datadog/browser-rum' import ajv from 'ajv' +import { RumEventType } from '@datadog/browser-rum-core' const schemasDir = path.join(path.dirname(require.resolve('@datadog/rum-events-format/package.json')), 'schemas') +type RumResourceEventTypeWithWebSocket = RumResourceEvent['resource']['type'] | 'websocket' + export function validateRumFormat(events: RumEvent[]) { const instance = new ajv({ allErrors: true, @@ -15,6 +18,13 @@ export function validateRumFormat(events: RumEvent[]) { instance.addSchema(allJsonSchemas) events.forEach((rumEvent) => { + // TODO: remove this once we introduce websockets on rum-events-format + if ( + rumEvent.type === RumEventType.RESOURCE && + (rumEvent.resource.type as RumResourceEventTypeWithWebSocket) === 'websocket' + ) { + return + } void instance.validate('rum-events-schema.json', rumEvent) if (instance.errors) { diff --git a/test/e2e/playwright.config.ts b/test/e2e/playwright.config.ts index 9ffd74279e..cca55095dd 100644 --- a/test/e2e/playwright.config.ts +++ b/test/e2e/playwright.config.ts @@ -96,11 +96,12 @@ const baseWebServers = [ name: 'nuxt app', stdout: 'pipe' as const, cwd: path.join(__dirname, '../apps/nuxt-app'), - command: isLocal ? 'yarn dev' : 'yarn start', - env: { NO_COLOR: '1' }, + // `nuxt dev` shares one vite-node IPC server; parallel Playwright workers can trigger + // intermittent EINVAL / aborted navigations. Preview matches CI and tolerates concurrency. + command: 'yarn start', + env: { NO_COLOR: '1', PORT: '3100', NITRO_PORT: '3100' }, wait: { - // yarn dev logs: "➜ Local: http://localhost:PORT" - // yarn start logs: "Listening on http://[::]:PORT" + // yarn preview logs: "Listening on http://[::]:PORT" (or localhost in some versions) stdout: /(?:Local:\s+http:\/\/localhost|Listening on http:\/\/(?:\[[^\]]+\]|[^:]+)):(?\d+)/, }, }, @@ -108,8 +109,8 @@ const baseWebServers = [ name: 'nuxt vue router v4 app', stdout: 'pipe' as const, cwd: path.join(__dirname, '../apps/nuxt-vue-router-v4-app'), - command: isLocal ? 'yarn dev' : 'yarn start', - env: { NO_COLOR: '1', PORT: '3001', NITRO_PORT: '3001' }, + command: 'yarn start', + env: { NO_COLOR: '1', PORT: '3200', NITRO_PORT: '3200' }, wait: { stdout: /(?:Local:\s+http:\/\/localhost|Listening on http:\/\/(?:\[[^\]]+\]|[^:]+)):(?\d+)/, diff --git a/test/e2e/scenario/rum/websockets.scenario.ts b/test/e2e/scenario/rum/websockets.scenario.ts new file mode 100644 index 0000000000..94f0d75059 --- /dev/null +++ b/test/e2e/scenario/rum/websockets.scenario.ts @@ -0,0 +1,177 @@ +import type { RumResourceEvent } from '@datadog/browser-rum' +import type { RawRumEvent } from '@datadog/browser-rum-core' +import { expect, test } from '@playwright/test' +import { createTest, html } from '../../lib/framework' +import { expireSession } from '../../lib/helpers/session' + +const KNOWN_OUT_WS_MESSAGE = 'e2e-ws-ping' +const KNOWN_IN_WS_MESSAGE = `echo: ${KNOWN_OUT_WS_MESSAGE}` + +const WEBSOCKET_TEST_BODY = html` +

+

+ + + + + +` + +type RawRumResource = Extract +type WebSocketResourceProperties = NonNullable + +/** + * RUM resource event for our /ws-echo fixture with `resource.websocket` populated. Public + * {@link RumResourceEvent} omits `websocket` until rum-events-format is updated — use + * {@link isWebSocketResource} instead of casting at every filter/call site. + */ +type RumResourceEventWithWebSocket = RumResourceEvent & { + resource: RumResourceEvent['resource'] & { + websocket: WebSocketResourceProperties + } +} + +test.describe('rum websockets', () => { + createTest('collect websocket-connecting vital and websocket resource when the connection closes') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page }) => { + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + await page.locator('#ws-send').click() + await expect(page.locator('#ws-last-message')).toHaveText(KNOWN_IN_WS_MESSAGE) + await page.locator('#ws-close-client').click() + await expect(page.locator('#ws-status')).toContainText('closed') + + await flushEvents() + + const connectingVital = intakeRegistry.rumVitalEvents.find((e) => e.vital.name === 'websocket-connecting') + expect(connectingVital).toBeDefined() + + const rumEvent = getLastRumResourceEventWithWebSocket(intakeRegistry.rumResourceEvents) + expect(rumEvent).toBeDefined() + + const { websocket: ws } = rumEvent!.resource + + expect(ws.connection_id).toBe(connectingVital!.vital.id) + expect(ws.tracking_end_reason).toBe('close_event') + expect(ws.messages_out.count).toBe(1) + expect(ws.messages_out.size).toBe(KNOWN_OUT_WS_MESSAGE.length) + expect(ws.messages_in.count).toBe(1) + expect(ws.messages_in.size).toBe(KNOWN_IN_WS_MESSAGE.length) + }) + + createTest('websocket resource ends with close_event when the server closes the echo socket') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page, servers }) => { + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + + servers.base.app.closeEchoWebSockets!() + await expect(page.locator('#ws-status')).toContainText('closed') + + await flushEvents() + + const rumEvent = getLastRumResourceEventWithWebSocket(intakeRegistry.rumResourceEvents) + expect(rumEvent).toBeDefined() + + expect(rumEvent!.resource.websocket.tracking_end_reason).toBe('close_event') + }) + + createTest('websocket resource is reported with session_end when the session expires') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page, browserContext }) => { + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + await expireSession(page, browserContext) + + await flushEvents() + + const wsWithSessionEnd = getWebSocketResources(intakeRegistry.rumResourceEvents).find( + (e) => e.resource.websocket.tracking_end_reason === 'session_end' + ) + expect(wsWithSessionEnd).toBeDefined() + }) + + createTest('websocket resource records different start and end views when it spanned multiple views') + .withRum({ enableExperimentalFeatures: ['track_web_sockets'] }) + .withBody(WEBSOCKET_TEST_BODY) + .run(async ({ intakeRegistry, flushEvents, page }) => { + await page.evaluate(() => { + window.DD_RUM!.startView('view-a') + }) + await page.locator('#ws-open').click() + await expect(page.locator('#ws-status')).toHaveText('open') + await page.evaluate(() => { + window.DD_RUM!.startView('view-b') + }) + await page.locator('#ws-close-client').click() + + await flushEvents() + + const rumEvent = getLastRumResourceEventWithWebSocket(intakeRegistry.rumResourceEvents) + expect(rumEvent).toBeDefined() + + const { websocket: ws } = rumEvent!.resource + + expect(ws.start_view_id).toBeDefined() + expect(ws.end_view_id).toBeDefined() + expect(ws.start_view_id).not.toBe(ws.end_view_id) + }) +}) + +function isWebSocketResource(event: RumResourceEvent): event is RumResourceEventWithWebSocket { + // Public RumResourceEvent.resource omits `websocket` until rum-events-format is updated. + const resource = event.resource as unknown as { + url: unknown + type?: string + websocket?: WebSocketResourceProperties + } + + return resource.type === 'websocket' && resource.websocket !== null +} + +function getWebSocketResources(events: RumResourceEvent[]): RumResourceEventWithWebSocket[] { + return events.filter(isWebSocketResource) +} + +function getLastRumResourceEventWithWebSocket(events: RumResourceEvent[]): RumResourceEventWithWebSocket | undefined { + const list = getWebSocketResources(events) + return list.length === 0 ? undefined : list[list.length - 1] +} diff --git a/test/e2e/scripts/pinnedProxy.ts b/test/e2e/scripts/pinnedProxy.ts index 209780a076..1791fa2c8c 100644 --- a/test/e2e/scripts/pinnedProxy.ts +++ b/test/e2e/scripts/pinnedProxy.ts @@ -17,9 +17,10 @@ import http from 'node:http' import process from 'node:process' -import { Buffer } from 'node:buffer' import { WebSocketServer, WebSocket, type RawData } from 'ws' +import { rawDataToString } from '../lib/helpers/rawDataToString.ts' + interface JsonRpcMessage { method?: string guid?: string @@ -135,16 +136,6 @@ httpServer.listen(LISTEN, '127.0.0.1', () => { console.log(`[pinnedProxy] listening on ws://127.0.0.1:${LISTEN}/ -> ws://${UPSTREAM}/`) }) -function rawDataToString(data: RawData): string { - if (Array.isArray(data)) { - return Buffer.concat(data).toString('utf8') - } - if (Buffer.isBuffer(data)) { - return data.toString('utf8') - } - return Buffer.from(data).toString('utf8') -} - function forwardHeader(headers: http.IncomingHttpHeaders, name: string): Record { const value = headers[name] return typeof value === 'string' ? { [name]: value } : {}