diff --git a/packages/browser-rum-core/src/transport/startRumBatch.spec.ts b/packages/browser-rum-core/src/transport/startRumBatch.spec.ts index 3d196398b0..74fe5e44a7 100644 --- a/packages/browser-rum-core/src/transport/startRumBatch.spec.ts +++ b/packages/browser-rum-core/src/transport/startRumBatch.spec.ts @@ -1,8 +1,18 @@ -import type { RumViewEvent } from '../rumEvent.types' +import { ExperimentalFeature, Observable, addExperimentalFeatures } from '@datadog/browser-core' +import { resetExperimentalFeatures } from '@datadog/browser-core/src/tools/experimentalFeatures' +import type { FlushController, FlushEvent } from '@datadog/browser-core/src/transport/flushController' +import { registerCleanupTask } from '@datadog/browser-core/test' +import type { AssembledRumEvent } from '../rawRumEvent.types' import { RumEventType } from '../rawRumEvent.types' -import { assembleViewUpdateEvent } from './startRumBatch' -function makeViewEvent(overrides: Record = {}): RumViewEvent { +type AssembledViewEvent = Extract +import { + computeAssembledViewDiff, + createViewBatchRouter, + PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL, +} from './startRumBatch' + +function makeAssembledView(overrides: Record = {}): AssembledViewEvent { return { type: RumEventType.VIEW, date: 1000, @@ -32,13 +42,13 @@ function makeViewEvent(overrides: Record = {}): RumViewEvent { source: 'browser', context: {}, ...overrides, - } as unknown as RumViewEvent + } as unknown as AssembledViewEvent } -describe('assembleViewUpdateEvent', () => { +describe('computeAssembledViewDiff', () => { it('should return undefined when nothing has changed', () => { - const last = makeViewEvent() - const current = makeViewEvent({ + const last = makeAssembledView() + const current = makeAssembledView({ _dd: { document_version: 2, format_version: 2, @@ -46,7 +56,7 @@ describe('assembleViewUpdateEvent', () => { configuration: { start_session_replay_recording_manually: false }, }, }) - const result = assembleViewUpdateEvent(current, last) + const result = computeAssembledViewDiff(current, last) // Only document_version changed (always required, not a "meaningful change") // view.* unchanged → should return undefined @@ -54,8 +64,8 @@ describe('assembleViewUpdateEvent', () => { }) it('should always include required routing fields', () => { - const last = makeViewEvent() - const current = makeViewEvent({ + const last = makeAssembledView() + const current = makeAssembledView({ _dd: { document_version: 2, format_version: 2, @@ -75,20 +85,20 @@ describe('assembleViewUpdateEvent', () => { time_spent: 100, }, }) - const result = assembleViewUpdateEvent(current, last)! + const result = computeAssembledViewDiff(current, last)! - expect(result.type).toBe('view_update') - expect(result.application).toEqual({ id: 'app-1' }) - expect(result.session).toEqual({ id: 'sess-1', type: 'user' }) - expect(result.view.id).toBe('view-1') - expect(result.view.url).toBe('/home') - expect(result._dd?.document_version).toBe(2) - expect(result._dd?.format_version).toBe(2) + expect(result.type).toBe(RumEventType.VIEW_UPDATE) + expect((result as any).application).toEqual({ id: 'app-1' }) + expect((result as any).session).toEqual({ id: 'sess-1', type: 'user' }) + expect((result.view as any).id).toBe('view-1') + expect((result.view as any).url).toBe('/home') + expect((result._dd as any).document_version).toBe(2) + expect((result._dd as any).format_version).toBe(2) }) it('should include only changed view.* fields', () => { - const last = makeViewEvent() - const current = makeViewEvent({ + const last = makeAssembledView() + const current = makeAssembledView({ _dd: { document_version: 2, format_version: 2, @@ -108,18 +118,18 @@ describe('assembleViewUpdateEvent', () => { time_spent: 5000, }, }) - const result = assembleViewUpdateEvent(current, last)! + const result = computeAssembledViewDiff(current, last)! - expect(result.view.action).toEqual({ count: 3 }) // changed - expect(result.view.time_spent).toBe(5000) // changed - expect(result.view.error).toBeUndefined() // unchanged, stripped - expect(result.view.name).toBeUndefined() // unchanged, stripped - expect(result.view.url).toBe('/home') // required routing field, always present + expect((result.view as any).action).toEqual({ count: 3 }) // changed + expect((result.view as any).time_spent).toBe(5000) // changed + expect((result.view as any).error).toBeUndefined() // unchanged, stripped + expect((result.view as any).name).toBeUndefined() // unchanged, stripped + expect((result.view as any).url).toBe('/home') // required routing field, always present }) it('should strip unchanged top-level assembled fields', () => { - const last = makeViewEvent({ service: 'svc', version: '1.0.0' }) - const current = makeViewEvent({ + const last = makeAssembledView({ service: 'svc', version: '1.0.0' }) + const current = makeAssembledView({ _dd: { document_version: 2, format_version: 2, @@ -141,15 +151,15 @@ describe('assembleViewUpdateEvent', () => { service: 'svc', version: '1.0.0', }) - const result = assembleViewUpdateEvent(current, last)! + const result = computeAssembledViewDiff(current, last)! expect(result.service).toBeUndefined() // unchanged, stripped - expect(result.version).toBeUndefined() // unchanged, stripped + expect((result as any).version).toBeUndefined() // unchanged, stripped }) it('should keep top-level assembled fields that changed', () => { - const last = makeViewEvent({ service: 'old-service' }) - const current = makeViewEvent({ + const last = makeAssembledView({ service: 'old-service' }) + const current = makeAssembledView({ _dd: { document_version: 2, format_version: 2, @@ -170,14 +180,14 @@ describe('assembleViewUpdateEvent', () => { }, service: 'new-service', }) - const result = assembleViewUpdateEvent(current, last)! + const result = computeAssembledViewDiff(current, last)! expect(result.service).toBe('new-service') }) it('should not mutate the input events', () => { - const last = makeViewEvent() - const current = makeViewEvent({ + const last = makeAssembledView() + const current = makeAssembledView({ _dd: { document_version: 2, format_version: 2, @@ -198,8 +208,418 @@ describe('assembleViewUpdateEvent', () => { }, }) const currentService = current.service - assembleViewUpdateEvent(current, last) + computeAssembledViewDiff(current, last) expect(current.service).toBe(currentService) }) }) + +describe('startRumBatch partial_view_updates routing', () => { + beforeEach(() => { + addExperimentalFeatures([ExperimentalFeature.PARTIAL_VIEW_UPDATES]) + registerCleanupTask(resetExperimentalFeatures) + }) + + it('PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL should be 100', () => { + expect(PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL).toBe(100) + }) +}) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createMockBatch() { + const flushObservable = new Observable() + + const addSpy = jasmine.createSpy<(message: object) => void>('add') + const upsertSpy = jasmine.createSpy<(message: object, key: string) => void>('upsert') + + const batch = { + flushController: { flushObservable } as unknown as FlushController, + add: addSpy, + upsert: upsertSpy, + } + + return { + batch, + addSpy, + upsertSpy, + flush: () => flushObservable.notify({ reason: 'bytes_limit', bytesCount: 0, messagesCount: 0 }), + } +} + +function makeView(viewId: string, docVersion: number, overrides: Record = {}): AssembledRumEvent { + return makeAssembledView({ + view: { + id: viewId, + name: 'Home', + url: '/home', + referrer: '', + is_active: true, + action: { count: 0 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + _dd: { + document_version: docVersion, + format_version: 2, + sdk_name: 'rum', + configuration: { start_session_replay_recording_manually: false }, + }, + ...overrides, + }) +} + +// --------------------------------------------------------------------------- +// createViewBatchRouter tests +// --------------------------------------------------------------------------- + +describe('createViewBatchRouter', () => { + describe('feature flag OFF', () => { + it('should upsert full VIEW events (legacy behaviour)', () => { + resetExperimentalFeatures() + registerCleanupTask(resetExperimentalFeatures) + + const { batch, upsertSpy } = createMockBatch() + const route = createViewBatchRouter(batch) + + const v1 = makeView('view-1', 1) + const v2 = makeView('view-1', 2) + route(v1) + route(v2) + + expect(upsertSpy.calls.count()).toBe(2) + expect((upsertSpy.calls.argsFor(0)[0] as AssembledRumEvent).type).toBe(RumEventType.VIEW) + expect((upsertSpy.calls.argsFor(1)[0] as AssembledRumEvent).type).toBe(RumEventType.VIEW) + }) + }) + + describe('non-view events', () => { + beforeEach(() => { + addExperimentalFeatures([ExperimentalFeature.PARTIAL_VIEW_UPDATES]) + registerCleanupTask(resetExperimentalFeatures) + }) + + it('should always append non-view events', () => { + const { batch, addSpy, upsertSpy } = createMockBatch() + const route = createViewBatchRouter(batch) + + const action = { type: RumEventType.ACTION } as unknown as AssembledRumEvent + route(action) + + expect(addSpy.calls.count()).toBe(1) + expect(upsertSpy.calls.count()).toBe(0) + }) + }) + + describe('optimization 1 — VIEW already in batch', () => { + beforeEach(() => { + addExperimentalFeatures([ExperimentalFeature.PARTIAL_VIEW_UPDATES]) + registerCleanupTask(resetExperimentalFeatures) + }) + + it('should upsert the latest full VIEW for every intermediate update', () => { + const { batch, upsertSpy } = createMockBatch() + const route = createViewBatchRouter(batch) + + const v1 = makeView('view-1', 1) + const v2 = makeView('view-1', 2, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: true, + action: { count: 1 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + route(v1) // initial — sets batchHasFullView + route(v2) // intermediate — opt 1 + + // Both calls must be upsert with full VIEW type + expect(upsertSpy.calls.count()).toBe(2) + expect((upsertSpy.calls.argsFor(0)[0] as AssembledRumEvent).type).toBe(RumEventType.VIEW) + expect((upsertSpy.calls.argsFor(1)[0] as AssembledRumEvent).type).toBe(RumEventType.VIEW) + // Both use the same key (view ID) + expect(upsertSpy.calls.argsFor(0)[1]).toBe('view-1') + expect(upsertSpy.calls.argsFor(1)[1]).toBe('view-1') + }) + + it('should not emit any view_update events while the VIEW is in the batch', () => { + const { batch, upsertSpy } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + route(makeView('view-1', 2)) + route(makeView('view-1', 3)) + + const emittedTypes = upsertSpy.calls.allArgs().map(([event]) => (event as AssembledRumEvent).type) + expect(emittedTypes.every((t) => t === RumEventType.VIEW)).toBeTrue() + }) + }) + + describe('optimization 2 — no VIEW in batch (post-flush)', () => { + beforeEach(() => { + addExperimentalFeatures([ExperimentalFeature.PARTIAL_VIEW_UPDATES]) + registerCleanupTask(resetExperimentalFeatures) + }) + + it('should upsert an aggregate view_update after a flush', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + flush() // batchHasFullView resets; batchBase = v1 + + upsertSpy.calls.reset() + + const v2 = makeView('view-1', 2, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: true, + action: { count: 1 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + route(v2) + + expect(upsertSpy.calls.count()).toBe(1) + const emitted = upsertSpy.calls.argsFor(0)[0] as AssembledRumEvent + expect(emitted.type).toBe(RumEventType.VIEW_UPDATE) + expect(upsertSpy.calls.argsFor(0)[1]).toBe('view-1') + }) + + it('should aggregate multiple updates into a single view_update per batch', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + flush() + + upsertSpy.calls.reset() + + // Three intermediate updates, each with a distinct action.count so each produces a diff. + const makeUpdate = (docVersion: number, actionCount: number) => + makeView('view-1', docVersion, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: true, + action: { count: actionCount }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + route(makeUpdate(2, 1)) + route(makeUpdate(3, 2)) + route(makeUpdate(4, 3)) + + // Each call upserts with the same key — the last one is what gets sent. + // Assert all three called upsert (not add), all under the same key. + expect(upsertSpy.calls.count()).toBe(3) + upsertSpy.calls.allArgs().forEach(([, key]) => expect(key).toBe('view-1')) + // All emitted events are view_update type (aggregate, not full VIEW) + const emittedTypes = upsertSpy.calls.allArgs().map(([e]) => (e as { type: string }).type) + expect(emittedTypes.every((t) => t === RumEventType.VIEW_UPDATE)).toBeTrue() + }) + + it('should compute the diff from batchBase, not from the previous intermediate update', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + // Initial view with action.count = 0 + route(makeView('view-1', 1)) + flush() // batchBase = v1 (action.count: 0) + + upsertSpy.calls.reset() + + // Two intermediate updates: action count goes 0 → 1 → 2 + route( + makeView('view-1', 2, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: true, + action: { count: 1 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + ) + route( + makeView('view-1', 3, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: true, + action: { count: 2 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + ) + + // The second upsert's aggregate diff must reflect action.count: 2 (diff from base, not from v2) + const lastEmitted = upsertSpy.calls.mostRecent().args[0] as AssembledRumEvent + expect((lastEmitted.view as any).action.count).toBe(2) + }) + + it('should emit nothing if nothing changed since batchBase', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + const v1 = makeView('view-1', 1) + route(v1) + flush() + + upsertSpy.calls.reset() + + // Same content, only document_version differs (always-required, ignored in diff) + const v2 = makeView('view-1', 2) + route(v2) + + expect(upsertSpy.calls.count()).toBe(0) + }) + }) + + describe('checkpoint', () => { + beforeEach(() => { + addExperimentalFeatures([ExperimentalFeature.PARTIAL_VIEW_UPDATES]) + registerCleanupTask(resetExperimentalFeatures) + }) + + it('should send a full VIEW after PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL intermediate updates', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + flush() + + upsertSpy.calls.reset() + + // Trigger exactly PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL updates + for (let i = 2; i <= PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL + 1; i++) { + route( + makeView('view-1', i, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: true, + action: { count: i }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + ) + } + + // The last upserted event must be a full VIEW (checkpoint) + const lastEmitted = upsertSpy.calls.mostRecent().args[0] as AssembledRumEvent + expect(lastEmitted.type).toBe(RumEventType.VIEW) + }) + }) + + describe('view lifecycle', () => { + beforeEach(() => { + addExperimentalFeatures([ExperimentalFeature.PARTIAL_VIEW_UPDATES]) + registerCleanupTask(resetExperimentalFeatures) + }) + + it('should send a full VIEW when a new view starts', () => { + const { batch, upsertSpy } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + route(makeView('view-2', 1)) // new view + + const keys = upsertSpy.calls.allArgs().map(([, key]) => key) + expect(keys).toEqual(['view-1', 'view-2']) + + const types = upsertSpy.calls.allArgs().map(([e]) => (e as AssembledRumEvent).type) + expect(types).toEqual([RumEventType.VIEW, RumEventType.VIEW]) + }) + + it('should send a full VIEW when the view ends', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + flush() + + upsertSpy.calls.reset() + + const endView = makeView('view-1', 2, { + view: { + id: 'view-1', + url: '/home', + referrer: '', + is_active: false, + action: { count: 0 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + route(endView) + + expect(upsertSpy.calls.count()).toBe(1) + expect((upsertSpy.calls.argsFor(0)[0] as AssembledRumEvent).type).toBe(RumEventType.VIEW) + expect((upsertSpy.calls.argsFor(0)[0] as any).view.is_active).toBe(false) + }) + + it('should reset to opt-1 after a new view starts following a flush', () => { + const { batch, upsertSpy, flush } = createMockBatch() + const route = createViewBatchRouter(batch) + + route(makeView('view-1', 1)) + flush() + + // Start a new view in the next batch + route(makeView('view-2', 1)) + // Intermediate update for view-2 — should be opt-1 (full VIEW), not opt-2 (view_update) + route( + makeView('view-2', 2, { + view: { + id: 'view-2', + url: '/home', + referrer: '', + is_active: true, + action: { count: 1 }, + error: { count: 0 }, + long_task: { count: 0 }, + resource: { count: 0 }, + time_spent: 0, + }, + }) + ) + + const view2Calls = upsertSpy.calls.allArgs().filter(([, k]) => k === 'view-2') + const emittedTypes = view2Calls.map(([e]) => (e as AssembledRumEvent).type) + expect(emittedTypes.every((t) => t === RumEventType.VIEW)).toBeTrue() + }) + }) +}) diff --git a/packages/browser-rum-core/src/transport/startRumBatch.ts b/packages/browser-rum-core/src/transport/startRumBatch.ts index af34570629..66ad1c7f52 100644 --- a/packages/browser-rum-core/src/transport/startRumBatch.ts +++ b/packages/browser-rum-core/src/transport/startRumBatch.ts @@ -15,17 +15,22 @@ import type { RumConfiguration } from '../domain/configuration' import type { LifeCycle } from '../domain/lifeCycle' import { LifeCycleEventType } from '../domain/lifeCycle' import type { AssembledRumEvent } from '../rawRumEvent.types' -import type { RumViewEvent, RumViewUpdateEvent } from '../rumEvent.types' +import type { RumViewUpdateEvent } from '../rumEvent.types' import { RumEventType } from '../rawRumEvent.types' import { diffMerge } from '../domain/view/viewDiff' +type AssembledViewEvent = Extract + export const PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL = 100 -export function assembleViewUpdateEvent( - current: RumViewEvent, - last: RumViewEvent -): (RumViewUpdateEvent & Context) | undefined { - const diff = diffMerge(current, last, { +export function computeAssembledViewDiff( + current: AssembledViewEvent, + last: AssembledViewEvent +): RumViewUpdateEvent | undefined { + const currentObj = current as unknown as Record + const lastObj = last as unknown as Record + + const diff = diffMerge(currentObj, lastObj, { // context, connectivity, usr, device, privacy are objects — use REPLACE to avoid partial updates replaceKeys: new Set(['view.custom_timings', 'context', 'connectivity', 'usr', 'device', 'privacy']), appendKeys: new Set(['_dd.page_states']), @@ -46,104 +51,162 @@ export function assembleViewUpdateEvent( return undefined } - // Restore the ignoreKeys — backend needs them on every event + const currentView = currentObj.view as Record + const currentDd = currentObj._dd as Record + + // Merge always-required fields on top of the diff for backend routing return combine(diff, { type: RumEventType.VIEW_UPDATE, - date: current.date, - application: current.application, - session: current.session, + date: currentObj.date, + application: currentObj.application, + session: currentObj.session, view: { - id: current.view.id, - url: current.view.url, + id: currentView.id, + url: currentView.url, }, _dd: { - document_version: current._dd.document_version, - format_version: current._dd.format_version, + document_version: currentDd.document_version, + format_version: currentDd.format_version, }, - }) as RumViewUpdateEvent & Context + }) as unknown as RumViewUpdateEvent } -export function startRumBatch( - configuration: RumConfiguration, - lifeCycle: LifeCycle, - reportError: (error: RawError) => void, - pageMayExitObservable: Observable, - sessionExpireObservable: Observable, - createEncoder: (streamId: DeflateEncoderStreamId) => Encoder +/** + * Creates the VIEW routing handler for the RUM batch. + * + * Two optimizations over the naive "always upsert full view" approach: + * + * Optimization 1 — VIEW already in batch: + * Intermediate updates upsert the latest full VIEW (same key), keeping a single up-to-date + * entry. No view_update event is emitted. Equivalent to the non-experimental path. + * + * Optimization 2 — no VIEW in batch (post-flush): + * Intermediate updates compute an aggregate diff from batchBase (the state the backend + * received in the last batch) and upsert it under the same key. Multiple updates in the + * same batch produce one view_update, not N. + * + * On each flush, batchHasFullView resets and batchBase advances to lastSentView. + * The checkpoint (every N updates in opt-2) upserts a full VIEW, replacing any pending + * view_update under the same key. + */ +export function createViewBatchRouter( + batch: Pick, 'flushController' | 'add' | 'upsert'> ) { - const endpoints = [createEndpointBuilder(configuration, 'rum')] - const replicaEndpoint = createReplicaEndpointBuilder(configuration, 'rum') - if (replicaEndpoint) { - endpoints.push(replicaEndpoint) - } + let lastSentView: AssembledViewEvent | undefined + // Base used to compute the aggregate diff for the current batch's view_update. + // Reset to lastSentView on each flush (= what the backend received in the previous batch). + let batchBase: AssembledViewEvent | undefined + // True when the current batch already contains a full VIEW event for the active view. + // Reset to false on each flush. + let batchHasFullView = false + let viewUpdatesSinceCheckpoint = 0 - const batch = createBatch({ - encoder: createEncoder(DeflateEncoderStreamId.RUM), - request: createHttpRequest(endpoints, reportError), - flushController: createFlushController({ - pageMayExitObservable, - sessionExpireObservable, - }), + // On flush: advance batchBase to lastSentView (what the backend now has) and clear the flag. + batch.flushController.flushObservable.subscribe(() => { + batchHasFullView = false + batchBase = lastSentView }) - let lastSentView: RumViewEvent | undefined - let viewUpdatesSinceCheckpoint = 0 - - lifeCycle.subscribe(LifeCycleEventType.RUM_EVENT_COLLECTED, (serverRumEvent: AssembledRumEvent) => { + return (serverRumEvent: AssembledRumEvent) => { if (serverRumEvent.type !== RumEventType.VIEW) { // Non-view events: always append batch.add(serverRumEvent) return } - const viewEvent = serverRumEvent as RumViewEvent - if (!isExperimentalFeatureEnabled(ExperimentalFeature.PARTIAL_VIEW_UPDATES)) { // Feature OFF: existing behavior — upsert full view - batch.upsert(viewEvent as unknown as Context, viewEvent.view.id) + batch.upsert(serverRumEvent, serverRumEvent.view.id) return } - const viewId = viewEvent.view.id + const viewId = serverRumEvent.view.id - // View ended (is_active: false) — always send a full view for backend recovery - if (!viewEvent.view.is_active) { - lastSentView = undefined + // New view started + if (viewId !== lastSentView?.view.id) { + lastSentView = serverRumEvent + batchBase = serverRumEvent + batchHasFullView = true viewUpdatesSinceCheckpoint = 0 - batch.upsert(viewEvent as unknown as Context, viewId) + batch.upsert(serverRumEvent, viewId) return } - // New view started - if (viewId !== lastSentView?.view.id) { - lastSentView = viewEvent + // View ended (is_active: false) + if (!serverRumEvent.view.is_active) { + lastSentView = undefined + batchBase = undefined + batchHasFullView = false viewUpdatesSinceCheckpoint = 0 - batch.upsert(viewEvent as unknown as Context, viewId) + batch.upsert(serverRumEvent, viewId) + return + } + + // Intermediate update + lastSentView = serverRumEvent + + if (batchHasFullView) { + // Optimization 1: batch already has a full VIEW — replace it with the latest full view. + // This is equivalent to the non-experimental upsert behavior for in-flight batches. + batch.upsert(serverRumEvent, viewId) return } - // Checkpoint: every N intermediate updates, send a full view + // Optimization 2: no full VIEW in the current batch — compute an aggregate diff from + // batchBase (last state the backend received) and upsert it under the same key as the VIEW. + // Each update replaces the previous aggregate, so the batch always contains at most one + // view_update per view ID. + // + // Note: view_update events are created here, post-assembly, bypassing + // RAW_RUM_EVENT_COLLECTED → assembly → RUM_EVENT_COLLECTED. They intentionally skip + // beforeSend — view_update is an internal bandwidth optimization, not a customer-visible type. + + // Checkpoint: periodically send a full VIEW for backend reliability. + // When the checkpoint fires, upsert(VIEW) replaces any pending view_update (same key). viewUpdatesSinceCheckpoint += 1 if (viewUpdatesSinceCheckpoint >= PARTIAL_VIEW_UPDATE_CHECKPOINT_INTERVAL) { viewUpdatesSinceCheckpoint = 0 - lastSentView = viewEvent - batch.upsert(viewEvent as unknown as Context, viewId) + batchHasFullView = true + batchBase = serverRumEvent + batch.upsert(serverRumEvent, viewId) return } - // Intermediate update: compute diff and send view_update. - // Note: view_update events are created here, post-assembly, and go directly to batch.add(). - // They intentionally bypass RAW_RUM_EVENT_COLLECTED → assembly → RUM_EVENT_COLLECTED, which - // means they skip beforeSend entirely. view_update is an internal bandwidth optimization — - // not a customer-visible event type, and not modifiable via beforeSend. - const diff = assembleViewUpdateEvent(viewEvent, lastSentView) - lastSentView = viewEvent - if (diff) { - sendToExtension('rum', diff) - batch.add(diff) + if (batchBase) { + const diff = computeAssembledViewDiff(serverRumEvent, batchBase) + if (diff) { + sendToExtension('rum', diff) + batch.upsert(diff as unknown as Context, viewId) + } } - // If diff is undefined (nothing changed), skip — no event emitted + } +} + +export function startRumBatch( + configuration: RumConfiguration, + lifeCycle: LifeCycle, + reportError: (error: RawError) => void, + pageMayExitObservable: Observable, + sessionExpireObservable: Observable, + createEncoder: (streamId: DeflateEncoderStreamId) => Encoder +) { + const endpoints = [createEndpointBuilder(configuration, 'rum')] + const replicaEndpoint = createReplicaEndpointBuilder(configuration, 'rum') + if (replicaEndpoint) { + endpoints.push(replicaEndpoint) + } + + const batch = createBatch({ + encoder: createEncoder(DeflateEncoderStreamId.RUM), + request: createHttpRequest(endpoints, reportError), + flushController: createFlushController({ + pageMayExitObservable, + sessionExpireObservable, + }), }) + const route = createViewBatchRouter(batch) + lifeCycle.subscribe(LifeCycleEventType.RUM_EVENT_COLLECTED, route) + return batch } diff --git a/test/e2e/scenario/rum/partialViewUpdates.scenario.ts b/test/e2e/scenario/rum/partialViewUpdates.scenario.ts index ad84e020f1..8acf0afc65 100644 --- a/test/e2e/scenario/rum/partialViewUpdates.scenario.ts +++ b/test/e2e/scenario/rum/partialViewUpdates.scenario.ts @@ -1,15 +1,39 @@ import { test, expect } from '@playwright/test' -import { createTest, waitForRequests } from '../../lib/framework' +import { createTest, html, waitForRequests } from '../../lib/framework' +import type { IntakeRegistry } from '../../lib/framework' + +// Loose type for view_update events received at the intake (no generated schema type yet) +interface ViewUpdateEvent { + type: string + date: number + application: { id: string } + session: { id: string } + view: { id: string; is_active?: boolean; [key: string]: unknown } + _dd: { document_version: number; [key: string]: unknown } + [key: string]: unknown +} + +// Helper: extract view_update events from all RUM events +// (intakeRegistry.rumViewEvents only returns type==='view') +function getViewUpdateEvents(intakeRegistry: IntakeRegistry): ViewUpdateEvent[] { + return intakeRegistry.rumEvents.filter((e) => (e.type as string) === 'view_update') as unknown as ViewUpdateEvent[] +} test.describe('partial view updates', () => { - createTest('should send view_update events after the initial view event') + createTest('should send a view_update event when the update arrives in a new batch') .withRum({ enableExperimentalFeatures: ['partial_view_updates'], }) .run(async ({ intakeRegistry, flushEvents, page }) => { - // Trigger a user action to cause a view update with changed metrics + // Flush the initial VIEW so it lands in its own batch. Any update that arrives + // while a VIEW is still in the batch is handled as opt-1 (full-view upsert, no view_update). + // Flushing first puts the next update in a fresh batch, triggering opt-2 (view_update). + await page.evaluate(() => window.dispatchEvent(new Event('beforeunload'))) + await waitForRequests(page) + + // setViewName triggers a view update immediately (no THROTTLE_VIEW_UPDATE_PERIOD delay). await page.evaluate(() => { - window.DD_RUM!.addAction('test-action') + window.DD_RUM!.setViewName('updated-name') }) await flushEvents() @@ -19,10 +43,9 @@ test.describe('partial view updates', () => { expect(viewEvents.length).toBeGreaterThanOrEqual(1) expect(viewEvents[0].type).toBe('view') - // Should have at least one view_update with the updated action count - const viewUpdateEvents = intakeRegistry.rumViewUpdateEvents + // Should have at least one view_update + const viewUpdateEvents = getViewUpdateEvents(intakeRegistry) expect(viewUpdateEvents.length).toBeGreaterThanOrEqual(1) - expect(viewUpdateEvents.some((e) => (e.view as { action?: { count: number } }).action?.count)).toBe(true) // All events share the same view.id const viewId = viewEvents[0].view.id @@ -31,27 +54,56 @@ test.describe('partial view updates', () => { } }) - createTest('should have monotonically increasing document_version across view and view_update events') + createTest('should upsert the full VIEW when a view update arrives in the same batch') .withRum({ enableExperimentalFeatures: ['partial_view_updates'], }) .run(async ({ intakeRegistry, flushEvents, page }) => { + // Trigger a view update without flushing first — it lands in the same batch as the initial VIEW. + // Optimization 1: the SDK replaces the VIEW in the batch (upsert) and emits no view_update. await page.evaluate(() => { - window.DD_RUM!.addAction('test-action') + window.DD_RUM!.setViewName('updated-name') }) await flushEvents() - // Collect document_versions from all view-related events (view + view_update) - const allDocVersions = [ - ...intakeRegistry.rumViewEvents.map((e) => e._dd.document_version), - ...intakeRegistry.rumViewUpdateEvents.map((e) => (e._dd as { document_version: number }).document_version), - ] + // Should have exactly one full VIEW event (the latest state after all in-batch updates) + expect(intakeRegistry.rumViewEvents.length).toBeGreaterThanOrEqual(1) - expect(allDocVersions.length).toBeGreaterThanOrEqual(2) + // Should NOT have any view_update events (opt-1: full VIEW replaced in batch) + const viewUpdateEvents = getViewUpdateEvents(intakeRegistry) + expect(viewUpdateEvents).toHaveLength(0) + }) - // Verify all document_versions are unique (no duplicates) - expect(new Set(allDocVersions).size).toBe(allDocVersions.length) + createTest('should have strictly increasing document_version across view and view_update events') + .withRum({ + enableExperimentalFeatures: ['partial_view_updates'], + }) + .run(async ({ intakeRegistry, flushEvents, page }) => { + // Flush the initial VIEW first so the subsequent update lands in a new batch (opt-2 path). + await page.evaluate(() => window.dispatchEvent(new Event('beforeunload'))) + await waitForRequests(page) + + await page.evaluate(() => { + window.DD_RUM!.setViewName('updated-name') + }) + + await flushEvents() + + // Collect all view-related events (view + view_update) sorted by document_version + const allViewRelatedEvents = [ + ...intakeRegistry.rumViewEvents.map((e) => ({ _dd: e._dd })), + ...getViewUpdateEvents(intakeRegistry).map((e) => ({ _dd: e._dd })), + ].sort((a, b) => a._dd.document_version - b._dd.document_version) + + expect(allViewRelatedEvents.length).toBeGreaterThanOrEqual(2) + + // Verify monotonic increase + for (let i = 1; i < allViewRelatedEvents.length; i++) { + expect(allViewRelatedEvents[i]._dd.document_version).toBeGreaterThan( + allViewRelatedEvents[i - 1]._dd.document_version + ) + } }) createTest('should only send view events when feature flag is not enabled') @@ -67,7 +119,7 @@ test.describe('partial view updates', () => { expect(intakeRegistry.rumViewEvents.length).toBeGreaterThanOrEqual(1) // Should NOT have any view_update events - const viewUpdateEvents = intakeRegistry.rumViewUpdateEvents + const viewUpdateEvents = getViewUpdateEvents(intakeRegistry) expect(viewUpdateEvents).toHaveLength(0) }) @@ -75,8 +127,17 @@ test.describe('partial view updates', () => { .withRum({ enableExperimentalFeatures: ['partial_view_updates'], }) + .withBody(html` + Navigate + + `) .run(async ({ intakeRegistry, flushEvents, page }) => { - await page.evaluate(() => history.pushState(null, '', '/new-page')) + // Trigger a route change to create a new view + await page.click('#nav-link') await flushEvents() @@ -94,13 +155,17 @@ test.describe('partial view updates', () => { enableExperimentalFeatures: ['partial_view_updates'], }) .run(async ({ intakeRegistry, flushEvents, page }) => { + // Flush the initial VIEW first so the update lands in a new batch (opt-2 path). + await page.evaluate(() => window.dispatchEvent(new Event('beforeunload'))) + await waitForRequests(page) + await page.evaluate(() => { - window.DD_RUM!.addAction('test-action') + window.DD_RUM!.setViewName('updated-name') }) await flushEvents() - const viewUpdateEvents = intakeRegistry.rumViewUpdateEvents + const viewUpdateEvents = getViewUpdateEvents(intakeRegistry) expect(viewUpdateEvents.length).toBeGreaterThanOrEqual(1) for (const event of viewUpdateEvents) { @@ -118,8 +183,17 @@ test.describe('partial view updates', () => { .withRum({ enableExperimentalFeatures: ['partial_view_updates'], }) + .withBody(html` + Navigate + + `) .run(async ({ intakeRegistry, flushEvents, page }) => { - await page.evaluate(() => history.pushState(null, '', '/other-page')) + // Navigate to trigger view end on the first view + await page.click('#nav-link') await flushEvents() @@ -131,7 +205,7 @@ test.describe('partial view updates', () => { expect(endEvent?.type).toBe('view') // No view_update should have is_active: false - const viewUpdateEvents = intakeRegistry.rumViewUpdateEvents + const viewUpdateEvents = getViewUpdateEvents(intakeRegistry) const endUpdateEvent = viewUpdateEvents.find((e) => e.view.id === firstViewId && e.view.is_active === false) expect(endUpdateEvent).toBeUndefined() }) @@ -141,9 +215,12 @@ test.describe('partial view updates', () => { enableExperimentalFeatures: ['partial_view_updates'], }) .run(async ({ intakeRegistry, flushEvents, page }) => { - // Flush the initial view first so it arrives at the intake in its own batch. - // The checkpoint (every 100 updates) uses batch.upsert with the same viewId which would - // replace the initial view if they share a batch — flushing first prevents that. + // Flush the initial VIEW first so it arrives at the intake in its own batch. + // If the initial VIEW and the 102 updates were in the same batch, optimization 1 would + // keep upserting the full VIEW (no view_update), and the checkpoint would replace it + // in the same slot — we'd see only one VIEW event total instead of two. + // Flushing first ensures subsequent updates go through the opt-2 path, where the + // checkpoint sends a second full VIEW in a later batch. // Dispatching beforeunload triggers the SDK batch send without navigating away. await page.evaluate(() => window.dispatchEvent(new Event('beforeunload'))) await waitForRequests(page)