diff --git a/.changeset/sweet-pigs-pump.md b/.changeset/sweet-pigs-pump.md new file mode 100644 index 00000000..461673b0 --- /dev/null +++ b/.changeset/sweet-pigs-pump.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": patch +--- + +feat: add data tracks support diff --git a/examples/data-tracks/.env.example b/examples/data-tracks/.env.example new file mode 100644 index 00000000..99816257 --- /dev/null +++ b/examples/data-tracks/.env.example @@ -0,0 +1,6 @@ +# 1. Copy this file and rename it to .env.local +# 2. Update the environment variables below. + +LIVEKIT_API_KEY=mykey +LIVEKIT_API_SECRET=mysecret +LIVEKIT_URL=wss://myproject.livekit.cloud diff --git a/examples/data-tracks/README.md b/examples/data-tracks/README.md new file mode 100644 index 00000000..3f22cb4f --- /dev/null +++ b/examples/data-tracks/README.md @@ -0,0 +1,46 @@ +# Data Tracks Example + +This example demonstrates how to publish and subscribe to [data tracks](https://docs.livekit.io/transport/data/data-tracks/) in LiveKit. It consists of two scripts: + +- **publisher** — Connects to a room, publishes a data track, and pushes frames at a regular interval. +- **subscriber** — Connects to a room, listens for published data tracks, subscribes, and logs received frames. + +## Prerequisites + +Before running this example, make sure you have: + +1. Node.js installed on your machine. +2. A LiveKit server running (either locally or remotely). +3. LiveKit API key and secret. + +## Setup + +1. Install dependencies: + + ``` + pnpm install + ``` + +2. Create a `.env.local` file in the example directory with your LiveKit credentials: + + ``` + LIVEKIT_API_KEY=your_api_key + LIVEKIT_API_SECRET=your_api_secret + LIVEKIT_URL=your_livekit_url + ``` + +## Running the Example + +Start the subscriber in one terminal: + +``` +pnpm run subscriber +``` + +Then start the publisher in another terminal: + +``` +pnpm run publisher +``` + +The subscriber will log received frames and their latency to the terminal. diff --git a/examples/data-tracks/package.json b/examples/data-tracks/package.json new file mode 100644 index 00000000..d7c7fb52 --- /dev/null +++ b/examples/data-tracks/package.json @@ -0,0 +1,23 @@ +{ + "name": "example-data-tracks", + "author": "LiveKit", + "private": true, + "description": "Example of using data tracks in LiveKit", + "type": "module", + "scripts": { + "lint": "eslint -f unix \"**/*.ts\"", + "publisher": "tsx publisher.ts", + "subscriber": "tsx subscriber.ts" + }, + "keywords": [], + "license": "Apache-2.0", + "dependencies": { + "@livekit/rtc-node": "workspace:*", + "dotenv": "^16.4.5", + "livekit-server-sdk": "workspace:*" + }, + "devDependencies": { + "@types/node": "^20.10.4", + "tsx": "^4.7.1" + } +} diff --git a/examples/data-tracks/publisher.ts b/examples/data-tracks/publisher.ts new file mode 100644 index 00000000..218c4d80 --- /dev/null +++ b/examples/data-tracks/publisher.ts @@ -0,0 +1,70 @@ +import { + type DataTrackFrame, + DataTrackPushFrameError, + type LocalDataTrack, + Room, +} from '@livekit/rtc-node'; +import { config } from 'dotenv'; +import { setTimeout } from 'node:timers/promises'; +import { AccessToken } from 'livekit-server-sdk'; + +config({ path: '.env.local', override: false }); +const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY; +const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET; +const LIVEKIT_URL = process.env.LIVEKIT_URL; +if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) { + throw new Error('Missing required environment variables. Please check your .env.local file.'); +} + +async function readSensor(): Promise { + return new Uint8Array(256).fill(0xfa); +} + +async function pushFrames(track: LocalDataTrack) { + while (true) { + console.log('Pushing frame'); + const data = await readSensor(); + try { + const frame: DataTrackFrame = { + payload: data, + userTimestamp: BigInt(Date.now()), + }; + track.tryPush(frame); + } catch (e) { + if (e instanceof DataTrackPushFrameError) { + console.error('Failed to push frame:', e.message); + } else { + throw e; + } + } + await setTimeout(500); + } +} + +const main = async () => { + const roomName = 'data-track-demo'; + const identity = 'publisher'; + const token = await createToken(identity, roomName); + + const room = new Room(); + await room.connect(LIVEKIT_URL, token); + console.log('connected to room', room.name); + + const track = await room.localParticipant.publishDataTrack({ name: 'my_sensor_data' }); + await pushFrames(track); +}; + +const createToken = async (identity: string, roomName: string) => { + const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, { + identity, + }); + token.addGrant({ + room: roomName, + roomJoin: true, + roomCreate: true, + canPublish: true, + }); + return await token.toJwt(); +}; + +main(); diff --git a/examples/data-tracks/subscriber.ts b/examples/data-tracks/subscriber.ts new file mode 100644 index 00000000..539b8360 --- /dev/null +++ b/examples/data-tracks/subscriber.ts @@ -0,0 +1,58 @@ +import { type RemoteDataTrack, Room, RoomEvent } from '@livekit/rtc-node'; +import { config } from 'dotenv'; +import { AccessToken } from 'livekit-server-sdk'; + +config({ path: '.env.local', override: false }); +const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY; +const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET; +const LIVEKIT_URL = process.env.LIVEKIT_URL; +if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) { + throw new Error('Missing required environment variables. Please check your .env.local file.'); +} + +async function subscribe(track: RemoteDataTrack) { + console.log( + `Subscribing to '${track.info.name}' published by '${track.publisherIdentity}'`, + ); + const stream = track.subscribe(); + for await (const frame of stream) { + console.log(`Received frame (${frame.payload.byteLength} bytes)`); + + if (frame.userTimestamp) { + const latencyMs = Date.now() - Number(frame.userTimestamp); + console.log(`Latency: ${latencyMs}ms`); + } + } +} + +const main = async () => { + const roomName = 'data-track-demo'; + const identity = 'subscriber'; + const token = await createToken(identity, roomName); + + const room = new Room(); + + room.on(RoomEvent.DataTrackPublished, (track: RemoteDataTrack) => { + subscribe(track).catch((e) => { + console.error(`Failed to subscribe to '${track.info.name}':`, e); + }); + }); + + await room.connect(LIVEKIT_URL, token); + console.log('connected to room', room.name); +}; + +const createToken = async (identity: string, roomName: string) => { + const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, { + identity, + }); + token.addGrant({ + room: roomName, + roomJoin: true, + roomCreate: true, + canSubscribe: true, + }); + return await token.toJwt(); +}; + +main(); diff --git a/packages/livekit-rtc/src/data_tracks/index.ts b/packages/livekit-rtc/src/data_tracks/index.ts new file mode 100644 index 00000000..916a8850 --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/index.ts @@ -0,0 +1,15 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export { + type DataTrackFrame, + type DataTrackInfo, + type DataTrackOptions, + type DataTrackSubscribeOptions, + PublishDataTrackError, + DataTrackPushFrameError, +} from './types.js'; + +export { LocalDataTrack } from './local.js'; +export { RemoteDataTrack } from './remote.js'; diff --git a/packages/livekit-rtc/src/data_tracks/local.ts b/packages/livekit-rtc/src/data_tracks/local.ts new file mode 100644 index 00000000..6699a47f --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/local.ts @@ -0,0 +1,99 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { + LocalDataTrackIsPublishedResponse, + LocalDataTrackTryPushResponse, + OwnedLocalDataTrack, +} from '@livekit/rtc-ffi-bindings'; +import { + LocalDataTrackIsPublishedRequest, + LocalDataTrackTryPushRequest, + LocalDataTrackUnpublishRequest, + DataTrackFrame as ProtoDataTrackFrame, +} from '@livekit/rtc-ffi-bindings'; +import { FfiClient, FfiHandle } from '../ffi_client.js'; +import type { DataTrackFrame, DataTrackInfo } from './types.js'; +import { DataTrackPushFrameError } from './types.js'; + +/** Data track published by the local participant. */ +export class LocalDataTrack { + private _info: DataTrackInfo; + private ffiHandle: FfiHandle; + + /** @internal */ + constructor(ownedTrack: OwnedLocalDataTrack) { + this._info = { + sid: ownedTrack.info!.sid!, + name: ownedTrack.info!.name!, + usesE2ee: ownedTrack.info!.usesE2ee!, + }; + this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!); + } + + /** Information about the data track. */ + get info(): DataTrackInfo { + return this._info; + } + + /** Whether or not the track is still published. */ + isPublished(): boolean { + const res = FfiClient.instance.request({ + message: { + case: 'localDataTrackIsPublished', + value: new LocalDataTrackIsPublishedRequest({ + trackHandle: this.ffiHandle.handle, + }), + }, + }); + return res.isPublished!; + } + + /** + * Try pushing a frame to subscribers of the track. + * + * See {@link DataTrackFrame} for how to construct a frame and attach metadata. + * + * Pushing a frame can fail for several reasons: + * + * - The track has been unpublished by the local participant or SFU + * - The room is no longer connected + * + * @throws {@link DataTrackPushFrameError} If the push fails. + */ + tryPush(frame: DataTrackFrame): void { + const protoFrame = new ProtoDataTrackFrame({ + payload: frame.payload, + userTimestamp: frame.userTimestamp, + }); + + const res = FfiClient.instance.request({ + message: { + case: 'localDataTrackTryPush', + value: new LocalDataTrackTryPushRequest({ + trackHandle: this.ffiHandle.handle, + frame: protoFrame, + }), + }, + }); + + if (res.error) { + throw new DataTrackPushFrameError(res.error.message!); + } + } + + /** + * Unpublish the track from the SFU. Once this is called, any further calls to + * {@link tryPush} will fail. + */ + async unpublish(): Promise { + FfiClient.instance.request({ + message: { + case: 'localDataTrackUnpublish', + value: new LocalDataTrackUnpublishRequest({ + trackHandle: this.ffiHandle.handle, + }), + }, + }); + } +} diff --git a/packages/livekit-rtc/src/data_tracks/remote.ts b/packages/livekit-rtc/src/data_tracks/remote.ts new file mode 100644 index 00000000..a083c11b --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/remote.ts @@ -0,0 +1,130 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { + DataTrackStreamEvent, + OwnedDataTrackStream, + OwnedRemoteDataTrack, + SubscribeDataTrackResponse, +} from '@livekit/rtc-ffi-bindings'; +import { + DataTrackStreamReadRequest, + DataTrackSubscribeOptions as ProtoDataTrackSubscribeOptions, + SubscribeDataTrackRequest, +} from '@livekit/rtc-ffi-bindings'; +import type { UnderlyingSource } from 'node:stream/web'; +import { FfiClient, FfiHandle } from '../ffi_client.js'; +import type { DataTrackFrame, DataTrackInfo, DataTrackSubscribeOptions } from './types.js'; + +/** Data track published by a remote participant. */ +export class RemoteDataTrack { + /** Information about the data track. */ + info: DataTrackInfo; + /** Identity of the participant who published the track. */ + publisherIdentity: string; + private ffiHandle: FfiHandle; + + /** @internal */ + constructor(ownedTrack: OwnedRemoteDataTrack) { + this.info = { + sid: ownedTrack.info!.sid!, + name: ownedTrack.info!.name!, + usesE2ee: ownedTrack.info!.usesE2ee!, + }; + this.publisherIdentity = ownedTrack.publisherIdentity!; + this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!); + } + + /** + * Subscribes to the data track to receive frames. + * + * Returns a `ReadableStream` that yields {@link DataTrackFrame}s as they arrive. + * + * An application may call `subscribe` more than once to process frames in multiple places. + * Internally, only the first call communicates with the SFU and allocates the resources + * required to receive frames. Additional subscriptions reuse the same underlying pipeline + * and do not trigger additional signaling. + * + * Note that newly created subscriptions only receive frames published after the initial + * subscription is established. + */ + subscribe(options?: DataTrackSubscribeOptions): ReadableStream { + const opts = new ProtoDataTrackSubscribeOptions({ + bufferSize: options?.bufferSize, + }); + + const res = FfiClient.instance.request({ + message: { + case: 'subscribeDataTrack', + value: new SubscribeDataTrackRequest({ + trackHandle: this.ffiHandle.handle, + options: opts, + }), + }, + }); + + return new ReadableStream(new DataTrackStreamSource(res.stream!), { + highWaterMark: 0, // Buffer owned by Rust + }); + } +} + +class DataTrackStreamSource implements UnderlyingSource { + private ffiHandle: FfiHandle; + private streamHandle: bigint; + private disposed = false; + + constructor(ownedStream: OwnedDataTrackStream) { + this.ffiHandle = new FfiHandle(ownedStream.handle!.id!); + this.streamHandle = ownedStream.handle!.id!; + } + + async pull(controller: ReadableStreamDefaultController): Promise { + FfiClient.instance.request({ + message: { + case: 'dataTrackStreamRead', + value: new DataTrackStreamReadRequest({ + streamHandle: this.streamHandle, + }), + }, + }); + + const event = await FfiClient.instance.waitFor((ev) => { + return ( + ev.message.case === 'dataTrackStreamEvent' && + ev.message.value.streamHandle === this.streamHandle + ); + }); + + switch (event.detail.case) { + case 'frameReceived': { + const protoFrame = event.detail.value.frame!; + controller.enqueue({ + payload: protoFrame.payload!, + userTimestamp: protoFrame.userTimestamp, + }); + break; + } + case 'eos': { + this.dispose(); + if (event.detail.value.error) { + controller.error(new Error(event.detail.value.error)); + } else { + controller.close(); + } + break; + } + } + } + + cancel(): void { + this.dispose(); + } + + private dispose(): void { + if (!this.disposed) { + this.disposed = true; + this.ffiHandle.dispose(); + } + } +} diff --git a/packages/livekit-rtc/src/data_tracks/types.ts b/packages/livekit-rtc/src/data_tracks/types.ts new file mode 100644 index 00000000..c47aa036 --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/types.ts @@ -0,0 +1,63 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** A frame published on a data track, consisting of a payload and optional metadata. */ +export type DataTrackFrame = { + /** The frame's payload. */ + payload: Uint8Array; + /** The frame's user timestamp, if one is associated. */ + userTimestamp?: bigint; +}; + +/** Information about a published data track. */ +export type DataTrackInfo = { + /** + * Unique track identifier assigned by the SFU. + * + * This identifier may change if a reconnect occurs. Use {@link DataTrackInfo.name | name} + * if a stable identifier is needed. + */ + sid: string; + /** Name of the track assigned by the publisher. */ + name: string; + /** Whether or not frames sent on the track use end-to-end encryption. */ + usesE2ee: boolean; +}; + +/** Options for publishing a data track. */ +export type DataTrackOptions = { + name: string; +}; + +export type DataTrackSubscribeOptions = { + /** + * The number of {@link DataTrackFrame}s to hold in the ReadableStream before discarding + * extra frames. When omitted, the default buffer size is used. + */ + bufferSize?: number; +}; + +/** An error that can occur when publishing a data track. */ +export class PublishDataTrackError extends Error { + constructor(message: string) { + super(message); + this.name = 'PublishDataTrackError'; + } +} + +/** + * Frame could not be pushed to a data track. + * + * Pushing a frame can fail for several reasons: + * + * - The track has been unpublished by the local participant or SFU + * - The room is no longer connected + * - Frames are being pushed too fast + */ +export class DataTrackPushFrameError extends Error { + constructor(message: string) { + super(message); + this.name = 'DataTrackPushFrameError'; + } +} diff --git a/packages/livekit-rtc/src/index.ts b/packages/livekit-rtc/src/index.ts index 6cc9652f..f30415dc 100644 --- a/packages/livekit-rtc/src/index.ts +++ b/packages/livekit-rtc/src/index.ts @@ -10,6 +10,18 @@ export type { NoiseCancellationOptions } from './audio_stream.js'; export { AudioFilter } from './audio_filter.js'; export { AudioMixer, type AudioMixerOptions } from './audio_mixer.js'; export * from './data_streams/index.js'; +export { + LocalDataTrack, + RemoteDataTrack, + PublishDataTrackError, + DataTrackPushFrameError, +} from './data_tracks/index.js'; +export type { + DataTrackFrame, + DataTrackInfo, + DataTrackOptions, + DataTrackSubscribeOptions, +} from './data_tracks/index.js'; export { E2EEManager, FrameCryptor, KeyProvider } from './e2ee.js'; export type { E2EEOptions, KeyProviderOptions } from './e2ee.js'; export { dispose } from './ffi_client.js'; diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 5abdc2ea..37b4ce91 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -69,6 +69,11 @@ import { RpcMethodInvocationResponseRequest, UnregisterRpcMethodRequest, } from '@livekit/rtc-ffi-bindings'; +import type { PublishDataTrackCallback, PublishDataTrackResponse } from '@livekit/rtc-ffi-bindings'; +import { + DataTrackOptions as ProtoDataTrackOptions, + PublishDataTrackRequest, +} from '@livekit/rtc-ffi-bindings'; import type { PathLike } from 'node:fs'; import { open, stat } from 'node:fs/promises'; import { @@ -78,6 +83,12 @@ import { type TextStreamInfo, TextStreamWriter, } from './data_streams/index.js'; +import { + type DataTrackOptions, + type LocalDataTrack, + PublishDataTrackError, +} from './data_tracks/index.js'; +import { LocalDataTrack as LocalDataTrackImpl } from './data_tracks/index.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; import { log } from './log.js'; import { type PerformRpcParams, RpcError, type RpcInvocationData } from './rpc.js'; @@ -770,6 +781,41 @@ export class LocalParticipant extends Participant { } } + /** + * Publishes a data track. + * + * @returns The published data track. Use {@link LocalDataTrack.tryPush} to send data frames. + * @throws {@link PublishDataTrackError} if there is an error publishing the data track. + * + * For LiveKit OSS, v1.11.0 or higher is required to use data tracks. + */ + async publishDataTrack(options: DataTrackOptions): Promise { + const protoOpts = new ProtoDataTrackOptions({ name: options.name }); + + const res = FfiClient.instance.request({ + message: { + case: 'publishDataTrack', + value: new PublishDataTrackRequest({ + localParticipantHandle: this.ffi_handle.handle, + options: protoOpts, + }), + }, + }); + + const cb = await FfiClient.instance.waitFor((ev) => { + return ev.message.case === 'publishDataTrack' && ev.message.value.asyncId === res.asyncId; + }); + + switch (cb.result.case) { + case 'track': + return new LocalDataTrackImpl(cb.result.value); + case 'error': + throw new PublishDataTrackError(cb.result.value.message!); + default: + throw new PublishDataTrackError('Unknown error publishing data track'); + } + } + /** * Initiate an RPC call to a remote participant. * @param params - Parameters for initiating the RPC call, see {@link PerformRpcParams} diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index e5358d57..10d4c9f7 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -33,6 +33,7 @@ import type { TextStreamHandler, TextStreamInfo, } from './data_streams/types.js'; +import { RemoteDataTrack } from './data_tracks/index.js'; import type { E2EEOptions } from './e2ee.js'; import { E2EEManager, defaultE2EEOptions } from './e2ee.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; @@ -698,6 +699,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter } else if (ev.case === 'tokenRefreshed') { this._token = ev.value.token; this.emit('tokenRefreshed'); + } else if (ev.case === 'dataTrackPublished') { + const remoteDataTrack = new RemoteDataTrack(ev.value.track!); + this.emit(RoomEvent.DataTrackPublished, remoteDataTrack); + } else if (ev.case === 'dataTrackUnpublished') { + this.emit(RoomEvent.DataTrackUnpublished, ev.value.sid!); } }; @@ -920,6 +926,8 @@ export type RoomCallbacks = { roomUpdated: () => void; moved: () => void; tokenRefreshed: () => void; + dataTrackPublished: (track: RemoteDataTrack) => void; + dataTrackUnpublished: (sid: string) => void; }; export enum RoomEvent { @@ -955,4 +963,6 @@ export enum RoomEvent { RoomUpdated = 'roomUpdated', Moved = 'moved', TokenRefreshed = 'tokenRefreshed', + DataTrackPublished = 'dataTrackPublished', + DataTrackUnpublished = 'dataTrackUnpublished', } diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index 5d765135..2c0ae578 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -12,6 +12,7 @@ import { ConnectionState, LocalAudioTrack, ParticipantKind, + type RemoteDataTrack, Room, RoomEvent, RpcError, @@ -515,6 +516,88 @@ describeE2E('livekit-rtc e2e', () => { testTimeoutMs * 2, ); + it( + 'publishes and subscribes to a data track', + async () => { + const FRAME_COUNT = 5; + const PAYLOAD_SIZE = 64; + const TRACK_NAME = 'test-track'; + + const { rooms } = await connectTestRooms(2); + const [subscriberRoom, publisherRoom] = rooms; + const publisherIdentity = publisherRoom!.localParticipant!.identity; + + const unpublishedEvent = waitForRoomEvent( + subscriberRoom!, + RoomEvent.DataTrackUnpublished, + testTimeoutMs, + (sid: string) => sid, + ); + + const publishedEvent = waitForRoomEvent( + subscriberRoom!, + RoomEvent.DataTrackPublished, + testTimeoutMs, + (track: RemoteDataTrack) => track, + ); + + const localTrack = await publisherRoom!.localParticipant!.publishDataTrack({ + name: TRACK_NAME, + }); + expect(localTrack.info.sid).toBeTruthy(); + expect(localTrack.info.name).toBe(TRACK_NAME); + expect(localTrack.isPublished()).toBe(true); + + const remoteTrack = await publishedEvent; + expect(remoteTrack.info.name).toBe(TRACK_NAME); + expect(remoteTrack.publisherIdentity).toBe(publisherIdentity); + + const stream = remoteTrack.subscribe(); + const reader = stream.getReader(); + + const pushTask = (async () => { + for (let i = 0; i < FRAME_COUNT; i++) { + localTrack.tryPush({ + payload: new Uint8Array(PAYLOAD_SIZE).fill(i), + userTimestamp: BigInt(Date.now()), + }); + await delay(100); + } + await localTrack.unpublish(); + })(); + + const readTask = (async () => { + let recvCount = 0; + while (true) { + const { done, value: frame } = await reader.read(); + if (done) break; + const firstByte = frame.payload[0]!; + expect(frame.payload.every((b) => b === firstByte)).toBe(true); + expect(frame.payload.byteLength).toBe(PAYLOAD_SIZE); + expect(frame.userTimestamp).toBeDefined(); + const latency = (Date.now() - Number(frame.userTimestamp!)) / 1000; + expect(latency).toBeLessThan(5.0); + recvCount++; + } + return recvCount; + })(); + + const recvCount = await withTimeout( + Promise.all([pushTask, readTask]).then(([, count]) => count), + testTimeoutMs, + 'Timed out during data track test', + ); + expect(recvCount).toBeGreaterThan(0); + + const unpublishedSid = await unpublishedEvent; + expect(unpublishedSid).toBe(localTrack.info.sid); + + reader.releaseLock(); + await Promise.all(rooms.map((r) => r.disconnect())); + }, + testTimeoutMs * 2, + ); + it( 'cleans up stream controllers when disconnecting during an active stream', async () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0265559a..db80a34d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -110,6 +110,25 @@ importers: specifier: ^4.7.1 version: 4.17.0 + examples/data-tracks: + dependencies: + '@livekit/rtc-node': + specifier: workspace:* + version: link:../../packages/livekit-rtc + dotenv: + specifier: ^16.4.5 + version: 16.4.5 + livekit-server-sdk: + specifier: workspace:* + version: link:../../packages/livekit-server-sdk + devDependencies: + '@types/node': + specifier: ^20.10.4 + version: 20.19.11 + tsx: + specifier: ^4.7.1 + version: 4.17.0 + examples/publish-wav: dependencies: '@livekit/rtc-node': @@ -2583,9 +2602,6 @@ packages: get-tsconfig@4.10.1: resolution: {integrity: sha512-auHyJ4AgMz7vgS8Hp3N6HXSmlMdUyhSUrfBF16w153rxtLIEOE+HGqaBppczZvnHLqQJfiHotCYpNhl0lUROFQ==} - get-tsconfig@4.8.0: - resolution: {integrity: sha512-Pgba6TExTZ0FJAn1qkJAjIeKoDJ3CsI2ChuLohJnZl/tTU8MVrq3b+2t5UOPfRa4RMsorClBjJALkJUMjG1PAw==} - git-repo-info@2.1.1: resolution: {integrity: sha512-8aCohiDo4jwjOwma4FmYFd3i97urZulL8XL24nIPxuE+GZnfsAyy/g2Shqx6OjUiFKUXZM+Yy+KHnOmmA3FVcg==} engines: {node: '>= 4.0'} @@ -6350,10 +6366,6 @@ snapshots: dependencies: resolve-pkg-maps: 1.0.0 - get-tsconfig@4.8.0: - dependencies: - resolve-pkg-maps: 1.0.0 - git-repo-info@2.1.1: {} glob-parent@5.1.2: @@ -7573,7 +7585,7 @@ snapshots: tsx@4.17.0: dependencies: esbuild: 0.23.1 - get-tsconfig: 4.8.0 + get-tsconfig: 4.10.1 optionalDependencies: fsevents: 2.3.3