diff --git a/package.json b/package.json index ed50085..103b952 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "assemblyai", - "version": "4.33.3", + "version": "4.34.0", "description": "The AssemblyAI JavaScript SDK provides an easy-to-use interface for interacting with the AssemblyAI API, which supports async and real-time transcription, as well as the latest LeMUR models.", "engines": { "node": ">=18" diff --git a/samples/streaming-dual-channel-mic-system/README.md b/samples/streaming-dual-channel-mic-system/README.md new file mode 100644 index 0000000..8172f8e --- /dev/null +++ b/samples/streaming-dual-channel-mic-system/README.md @@ -0,0 +1,240 @@ +# streaming-dual-channel-mic-system + +Browser sample that streams **microphone + system audio** as a single mixed mono +stream to AssemblyAI's Streaming v3 endpoint, with per-word physical-channel +attribution (`mic` / `system`) layered on top of AAI's voice diarization +(`speaker_label`). + +## Run + +```bash +npm install +npm run dev +``` + +Then open the printed URL in Chrome, paste a streaming temporary token, and +click **Start**. + +Note: the sample's `package.json` references `"assemblyai": "file:../.."` so it +builds against the local SDK source. Run `pnpm build` (or `npm run build`) once +at the SDK root before installing here. + +## Getting a temporary token + +API-key auth is unsupported in browsers. Mint a token from your backend: + +```ts +const token = await client.streaming.createTemporaryToken({ + expires_in_seconds: 600, +}) +``` + +## Swappable VAD + +The SDK's `channelAttribution.createVad` factory is an extension point — any +class that implements `VadDetector` (`process(frame: Float32Array) → { active, +energy }` plus `reset()`) can replace the default `EnergyVad`. To plug in a +custom VAD (Silero / DNN / your own), pass a factory: + +```ts +channelAttribution: { + createVad: (channelName) => new YourCustomVadDetector(channelName), +} +``` + +The factory is called once per declared channel at transcriber construction +time, and the channel name (`mic` / `system` / whatever you declared in +`channels: [{ name }]`) is passed in — so factories that wrap higher-level VAD +libraries (which manage their own audio source) can map each `VadDetector` +instance to its corresponding channel. + +This sample uses the default `EnergyVad` and exposes its tuning knobs via the +sliders described below. + +## Resolve unknown channels + +[`channelAttribution.resolveUnknownChannelsMethod`](../../src/types/streaming/index.ts) +controls how words whose per-word VAD attribution resolved to `"unknown"` are +filled in. Confident per-word VAD decisions (`"mic"` / `"system"`) are never +modified by any strategy. Default: `"window"`. + +The sample's "Resolve unknown channels" dropdown switches between: + +- **`window`** (default): look at the dominant non-`"unknown"` channel among + ±2 neighboring words in the same turn. Ignores `speaker_label`, so it + works even when AAI re-uses a label for two physically distinct voices. + Words with no non-`"unknown"` neighbors stay `"unknown"`. +- **`speaker-history`**: accumulate per-`speaker_label` per-channel active + VAD energy across the session. Fill `"unknown"` words with the speaker's + dominant channel when their total evidence clears + `speakerHistoryMinRmsEvidence` (default `0.5`) and beats runner-up by + `speakerHistoryDominanceRatio` (default `3`). Robust when speaker labels + are stable; does nothing when a speaker's evidence is split. +- **`none`**: disable resolution. `"unknown"` words render as-is. + +Resolved words are flagged with `word.channelResolved = true`, and the sample +renders them with a trailing asterisk (e.g. `[mic*/spk A]`) so you can see +exactly when resolution fired. + +### EnergyVad tuning sliders + +The sample lets you tune the default +[`EnergyVad`](../../src/services/streaming/energy-vad.ts) parameters +in real time: + +- **Threshold ratio** (default `3`, range `1.5`–`5`, step `0.5`): the VAD + trips when `frameRMS > noiseFloor × thresholdRatio`. Lower values are more + sensitive (catch quieter speech, more false positives on background). + Higher values miss quiet utterance onsets/offsets. +- **Hangover frames** (default `10` = ~200 ms, range `0`–`25`, step `5`): + how many frames the VAD stays "active" after the last detected speech + frame. Longer hangovers smooth attribution across brief silences within + an utterance. + +The slider values are baked into the `EnergyVad` instances created at start +time via `channelAttribution.createVad`; they cannot be changed +mid-session — Stop and Start again to apply new values. + +### Speaker-change log + +When `speakerLabels` is enabled and a turn's words include a transition in +the composite `(channel, speaker_label)` key vs. the previous final word, +the sample logs a line like: + + [Speaker change: mic-A → system-B] + +This is the recommended pattern for transcript renderers that want to split +on speaker boundaries: compare the `(channel, speaker_label)` composite key +between consecutive words. `channel` reliably reports the physical source +(VAD-derived); `speaker_label` is AAI's acoustic diarization on the mixed +mono stream. Either change is a real boundary. + +## Platform caveats + +- **macOS:** `getDisplayMedia({ audio: true })` does **not** capture system + audio by default. Install [BlackHole](https://existential.audio/blackhole/) or + [Loopback.app](https://rogueamoeba.com/loopback/) and route system audio + through the loopback device to make it available. +- **Windows:** sharing the whole screen via the picker exposes full system + audio; sharing only a tab exposes just that tab's audio. + +## Speakers + open mic: apply echo cancellation at capture + +When the user listens to system audio through **speakers** (rather than +headphones) and their mic is open, the mic physically picks up the speaker +playback. The two channels then carry highly-correlated audio at similar +amplitudes, and the energy-based attribution can't reliably tell apart +"real mic speech" from "speakers played into mic." + +**Transcription accuracy is unaffected** — AAI still transcribes what was +said. What's affected is **per-word channel attribution**: words that +actually came from system audio may be tagged as `mic`. If you don't use +the per-word `channel` field downstream, you can ignore this. If you do — +for instance, to render `[mic]` / `[sys]` prefixes in a transcript UI — +apply echo cancellation **at the capture layer**, before audio reaches +the SDK. Two examples below. + +### Example 1: Browser (`getUserMedia` with built-in AEC) + +If you're capturing in the browser (like this sample app does), +`getUserMedia` already exposes Chrome's WebRTC AEC. Pass +`echoCancellation: true` when requesting the mic stream: + +```ts +const micStream = await navigator.mediaDevices.getUserMedia({ + audio: { + echoCancellation: true, // Subtracts speaker playback from mic. + noiseSuppression: true, // Optional: cleaner ambient. + autoGainControl: true, // Optional: smooths mic level. + }, +}) + +// Then hand the cleaned stream to the SDK as the mic channel. +const capture = new DualChannelCapture({ + micStream, + systemStream, // from getDisplayMedia({audio: true}) + transcriber, +}) +``` + +This is what the browser sample app already does — it's why `[mic]` +attribution works correctly even with speakers playing into the mic. + +### Example 2: Native / Node (swap in a DNN VAD via `createVad`) + +In server-side or native runtimes (Node, Electron, the Swift helpers +behind native CLIs, etc.) there is no `getUserMedia`. The right answer is +still to do echo cancellation **at capture** — macOS has +`AVAudioEngine.setVoiceProcessingEnabled(true)`, Linux has PulseAudio's +`module-echo-cancel`, telephony stacks usually have it in the codec. +**Use what your capture layer provides.** + +If platform-level AEC isn't available, the next-best option is to plug +in a DNN voice-activity detector via `channelAttribution.createVad`. A +DNN VAD distinguishes "real speech" from "playback that the mic +recaptured" using spectral characteristics (rather than energy), so it's +much more robust to speaker leak. [Silero VAD](https://github.com/snakers4/silero-vad) +is the typical choice; the [`@ricky0123/vad`](https://www.npmjs.com/package/@ricky0123/vad) +package bundles it for browser and Node. + +```ts +import { MicVAD } from "@ricky0123/vad-web" +import type { VadDetector, VadDetectorResult } from "assemblyai" + +// Adapter: wrap a Silero session as the SDK's VadDetector interface. +class SileroVad implements VadDetector { + constructor(private readonly speechProb: () => number) {} + process(frame: Float32Array): VadDetectorResult { + const p = this.speechProb() + let sumSq = 0 + for (let i = 0; i < frame.length; i++) sumSq += frame[i] * frame[i] + const rms = Math.sqrt(sumSq / Math.max(1, frame.length)) + return { active: p > 0.5, energy: rms } + } + reset(): void {} +} + +// One MicVAD per channel. +const micVad = await MicVAD.new({ + /* ... */ +}) +const systemVad = await MicVAD.new({ + /* ... */ +}) + +const transcriber = client.streaming.transcriber({ + speechModel: "u3-rt-pro", + sampleRate: 16_000, + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution: { + createVad: (channelName) => + new SileroVad( + channelName === "mic" + ? () => micVad.lastSpeechProb() + : () => systemVad.lastSpeechProb(), + ), + }, +}) +``` + +The factory is called once per declared channel at transcriber +construction time, so it's a clean place to wire each channel's VAD to +its own Silero session. + +### Why the SDK can't ship AEC itself + +Echo cancellation belongs at the capture layer — the moment audio enters +your application — because: + +1. Every platform (browser, macOS, iOS, Linux, Windows, telephony) has + its own AEC implementation tuned for its own audio stack. The SDK + only sees PCM after capture, by which point platform-specific delay + compensation and double-talk handling are out of reach. +2. A pure-JS AEC inside the SDK would re-invent what the OS / browser + already does well, with worse latency and worse quality. +3. Customers' AEC needs differ. Voice agents want aggressive AEC; + meeting recorders want light-touch AEC that doesn't suppress + overlapping speech. The capture layer is where that choice is made. + +Channel attribution in this SDK assumes capture-layer AEC is already +applied when speaker leak is a real concern. diff --git a/samples/streaming-dual-channel-mic-system/index.html b/samples/streaming-dual-channel-mic-system/index.html new file mode 100644 index 0000000..8361bd5 --- /dev/null +++ b/samples/streaming-dual-channel-mic-system/index.html @@ -0,0 +1,424 @@ + + + + + + AssemblyAI — Dual-Channel Mic + System + + + +

Dual-Channel Mic + System

+

+ Streams mic + system audio (via getDisplayMedia) as a single + mixed mono stream to AssemblyAI's Streaming v3 endpoint, with per-word + channel attribution. Requires a temporary token; never expose your API key + in the browser. +

+
+ How do I get a temporary token? +

+ Grab an API key from your + AssemblyAI dashboard, then run this in a terminal to mint a 60-second token (replace + <YOUR_API_KEY>): +

+
+curl -G https://streaming.assemblyai.com/v3/token \
+  -H "Authorization: <YOUR_API_KEY>" \
+  -d expires_in_seconds=60
+

Paste the token value from the response below.

+
+ +

+ + Resolved words are marked with a trailing * (e.g. + [mic*]). +

+

+ + Caps AAI diarization. Leave blank to let the server decide. +

+
+ + +
+
+
+

+ + +

+
+
+ mic +
+
+
+ –∞ dB + +
+
+ sys +
+
+
+ –∞ dB + +
+
+
+
+
+ +
+
+
+ + +
+
+ + + + diff --git a/samples/streaming-dual-channel-mic-system/main.ts b/samples/streaming-dual-channel-mic-system/main.ts new file mode 100644 index 0000000..00cf44c --- /dev/null +++ b/samples/streaming-dual-channel-mic-system/main.ts @@ -0,0 +1,392 @@ +import { + DualChannelCapture, + EnergyVad, + StreamingTranscriber, + type TurnEvent, +} from "assemblyai" + +// The SDK accepts an external VAD via channelAttribution.createVad. For example, +// to plug in a Silero / DNN VAD, pass: +// +// channelAttribution: { +// createVad: (channelName) => new YourCustomVadDetector(channelName), +// } +// +// Where YourCustomVadDetector implements the SDK's VadDetector interface +// (process(frame: Float32Array) → { active, energy } and reset()). The default +// is EnergyVad, which we use here. + +const tokenInput = document.getElementById("token") as HTMLInputElement +const startBtn = document.getElementById("start") as HTMLButtonElement +const stopBtn = document.getElementById("stop") as HTMLButtonElement +const resolveMethodSelect = document.getElementById( + "resolve-method", +) as HTMLSelectElement +const thresholdRatioInput = document.getElementById( + "threshold-ratio", +) as HTMLInputElement +const thresholdRatioValue = document.getElementById( + "threshold-ratio-value", +) as HTMLSpanElement +const hangoverFramesInput = document.getElementById( + "hangover-frames", +) as HTMLInputElement +const hangoverFramesValue = document.getElementById( + "hangover-frames-value", +) as HTMLSpanElement +const maxSpeakersInput = document.getElementById( + "max-speakers", +) as HTMLInputElement +const output = document.getElementById("output") as HTMLDivElement +const events = document.getElementById("events") as HTMLDivElement +const panelTabBtns = + document.querySelectorAll(".panel-tab-btn") + +/** Append one streaming event to the Events panel as a single JSON line. */ +function logEvent(kind: string, payload: unknown): void { + const line = document.createElement("div") + line.className = `event-line event-${kind}` + line.textContent = JSON.stringify({ kind, ...(payload as object) }) + events.appendChild(line) + events.scrollTop = events.scrollHeight +} + +// Panel tab switching. +for (const btn of panelTabBtns) { + btn.addEventListener("click", () => { + const active = btn.dataset.panelTab + for (const b of panelTabBtns) b.classList.toggle("active", b === btn) + output.classList.toggle("hidden", active !== "output") + events.classList.toggle("hidden", active !== "events") + }) +} +const micFill = document.getElementById("mic-fill") as HTMLDivElement +const sysFill = document.getElementById("sys-fill") as HTMLDivElement +const micDb = document.getElementById("mic-db") as HTMLSpanElement +const sysDb = document.getElementById("sys-db") as HTMLSpanElement +const micFlag = document.getElementById("mic-flag") as HTMLSpanElement +const sysFlag = document.getElementById("sys-flag") as HTMLSpanElement +const diag = document.getElementById("diag") as HTMLDivElement + +let transcriber: StreamingTranscriber | undefined +let capture: DualChannelCapture | undefined +let micStream: MediaStream | undefined +let systemStream: MediaStream | undefined + +// Wire slider value labels up-front so the user sees what the slider is set to +// before they press Start. +thresholdRatioInput.addEventListener("input", () => { + thresholdRatioValue.textContent = thresholdRatioInput.value +}) +hangoverFramesInput.addEventListener("input", () => { + hangoverFramesValue.textContent = hangoverFramesInput.value +}) + +function appendLine(line: string, className?: string): void { + const div = document.createElement("div") + if (className) div.className = className + div.textContent = line + output.appendChild(div) + output.scrollTop = output.scrollHeight +} + +function rmsToDbStr(rms: number): string { + if (rms < 1e-6) return "–∞ dB" + return `${(20 * Math.log10(rms)).toFixed(1)} dB` +} + +function rmsToBarPct(rms: number): number { + if (rms < 1e-6) return 0 + const db = 20 * Math.log10(rms) + return Math.max(0, Math.min(100, ((db + 60) / 60) * 100)) +} + +async function start(): Promise { + const token = tokenInput.value.trim() + if (!token) { + appendLine("Need a temporary token.") + return + } + startBtn.disabled = true + output.replaceChildren() + + micStream = await navigator.mediaDevices.getUserMedia({ + audio: { + echoCancellation: true, + noiseSuppression: true, + autoGainControl: true, + }, + }) + systemStream = await navigator.mediaDevices.getDisplayMedia({ + audio: true, + video: true, + }) + if (systemStream.getAudioTracks().length === 0) { + appendLine( + "No audio track in display capture. On macOS, install BlackHole or similar.", + ) + micStream.getTracks().forEach((t) => t.stop()) + systemStream.getTracks().forEach((t) => t.stop()) + startBtn.disabled = false + return + } + + const resolveMethod = resolveMethodSelect.value as + | "none" + | "window" + | "speaker-history" + const thresholdRatio = parseFloat(thresholdRatioInput.value) + const hangoverFrames = parseInt(hangoverFramesInput.value, 10) + const maxSpeakersRaw = maxSpeakersInput.value.trim() + const maxSpeakers = + maxSpeakersRaw === "" ? undefined : parseInt(maxSpeakersRaw, 10) + + const channelAttribution: NonNullable< + ConstructorParameters[0]["channelAttribution"] + > = { + createVad: () => new EnergyVad({ thresholdRatio, hangoverFrames }), + resolveUnknownChannelsMethod: resolveMethod, + } + + transcriber = new StreamingTranscriber({ + token, + sampleRate: 16_000, + speechModel: "u3-rt-pro", + speakerLabels: true, + ...(maxSpeakers !== undefined && { maxSpeakers }), + continuousPartials: true, + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution, + minTurnSilence: 400, + maxTurnSilence: 1000, + vadThreshold: 0.5, + }) + transcriber.on("open", (e) => logEvent("Begin", e)) + transcriber.on("error", (err) => { + appendLine(`[error] ${err.message}`) + logEvent("Error", { message: err.message }) + }) + transcriber.on("close", (code, reason) => { + appendLine(`[close] ${code} ${reason}`) + logEvent("Close", { code, reason }) + }) + transcriber.on("warning", (e) => logEvent("Warning", e)) + + // Meter state: EMA-smoothed peak RMS per channel + a "we ever saw audio?" + // diagnostic that flags after ~3 s of silence on the system side. + const ema = new Map([ + ["mic", 0], + ["system", 0], + ]) + let sysSawAudio = false + const startedAt = performance.now() + const EMA_ALPHA = 0.3 + const SYS_AUDIO_THRESHOLD = 5e-4 + + transcriber.on("vad", (frame) => { + const prev = ema.get(frame.channel) ?? 0 + const next = prev * (1 - EMA_ALPHA) + frame.rms * EMA_ALPHA + ema.set(frame.channel, next) + + if (frame.channel === "system" && frame.rms > SYS_AUDIO_THRESHOLD) { + if (!sysSawAudio) diag.textContent = "" + sysSawAudio = true + } + + if (frame.channel === "mic") { + micFill.style.width = `${Math.min(100, rmsToBarPct(next))}%` + micDb.textContent = rmsToDbStr(next) + micFlag.textContent = frame.active ? "● speaking" : "" + micFlag.className = `meter-flag${frame.active ? " active" : ""}` + } else if (frame.channel === "system") { + sysFill.style.width = `${Math.min(100, rmsToBarPct(next))}%` + sysDb.textContent = rmsToDbStr(next) + sysFlag.textContent = frame.active ? "● speaking" : "" + sysFlag.className = `meter-flag${frame.active ? " active" : ""}` + } + + if ( + !sysSawAudio && + performance.now() - startedAt > 3000 && + diag.textContent === "" + ) { + diag.textContent = + "No system audio detected after 3s. Common causes: (1) didn't tick 'Share tab audio' in the picker, (2) shared a different tab than the one playing audio, (3) on macOS you need a virtual loopback driver (BlackHole) for non-tab system audio." + } + }) + + // Per-turn container divs. Each container holds N bubbles (one per + // contiguous same-channel run within the turn) plus the rollup line on + // end_of_turn. Keyed by turn_order so partials replace the same container. + const turnContainers = new Map() + // Last committed turn-level (channel, speaker_label) composite — used to + // detect speaker changes between consecutive *finalized turns*. + let lastFinalTurnComposite: string | undefined + + /** + * Split a turn's words into contiguous same-channel runs. Words within a run + * share a channel value (treating `undefined` as `"unknown"`). Each run + * becomes its own bubble so a turn that contains both mic and system speech + * renders as multiple bubbles aligned to their respective sides. + */ + function groupByChannel(turn: TurnEvent) { + const runs: Array<{ + channel: string + anyResolved: boolean + words: TurnEvent["words"] + }> = [] + for (const w of turn.words) { + const ch = w.channel ?? "unknown" + const last = runs[runs.length - 1] + if (last && last.channel === ch) { + last.words.push(w) + if (w.channelResolved) last.anyResolved = true + } else { + runs.push({ channel: ch, anyResolved: !!w.channelResolved, words: [w] }) + } + } + return runs + } + + function renderTurnContainer( + container: HTMLElement, + turn: TurnEvent, + isFinal: boolean, + ): void { + container.replaceChildren() + const runs = groupByChannel(turn) + + for (let r = 0; r < runs.length; r++) { + const run = runs[r] + const bubble = document.createElement("div") + const align = run.channel === "mic" ? "align-right" : "align-left" + bubble.className = `turn-row ${align} ${isFinal ? "final" : "partial"}` + + // Show the partial marker only on the first bubble of an in-progress turn. + if (!isFinal && r === 0) { + const marker = document.createElement("span") + marker.className = "partial-marker" + marker.textContent = "[partial]" + bubble.appendChild(marker) + } + + for (const w of run.words) { + const chip = document.createElement("span") + chip.className = "word-chip" + const channelLabel = `${w.channel ?? "unknown"}${ + w.channelResolved ? "*" : "" + }` + // Per AAI docs, `word.speaker` is only set on final words and may + // still be absent; `turn.speaker_label` is only populated on the + // final turn event. On partials we have neither, so we render the + // channel only. + const speakerForWord = w.speaker ?? turn.speaker_label + const speakerChip = speakerForWord ? `/spk ${speakerForWord}` : "" + chip.textContent = `[${channelLabel}${speakerChip}] ${w.text} ` + bubble.appendChild(chip) + } + + container.appendChild(bubble) + } + } + + transcriber.on("turn", (turn) => { + logEvent("Turn", turn) + let container = turnContainers.get(turn.turn_order) + if (!container) { + container = document.createElement("div") + container.className = "turn-container" + output.appendChild(container) + turnContainers.set(turn.turn_order, container) + } + + renderTurnContainer(container, turn, turn.end_of_turn) + output.scrollTop = output.scrollHeight + + if (!turn.end_of_turn) return + + turnContainers.delete(turn.turn_order) + + // Turn-level speaker-change detection: compare this turn's primary + // (channel, speaker_label) to the previous finalized turn's. + const turnComposite = + turn.channel && turn.speaker_label + ? `${turn.channel}-${turn.speaker_label}` + : undefined + if ( + turnComposite && + lastFinalTurnComposite && + lastFinalTurnComposite !== turnComposite + ) { + const log = document.createElement("div") + log.className = "speaker-change-log" + log.textContent = `[Speaker change: ${lastFinalTurnComposite} → ${turnComposite}]` + container.before(log) + } + if (turnComposite) lastFinalTurnComposite = turnComposite + + // End-of-turn rollup line. + const rollupLine = document.createElement("div") + rollupLine.className = "rollup-line" + rollupLine.textContent = `-- End of turn #${turn.turn_order}: speaker_label: ${ + turn.speaker_label ?? "?" + } | Most active channel: ${turn.channel ?? "unknown"}` + container.after(rollupLine) + output.scrollTop = output.scrollHeight + }) + + // Expose so stop() can reset session-scoped state. + ;( + globalThis as unknown as { __resetTurnState?: () => void } + ).__resetTurnState = () => { + turnContainers.clear() + lastFinalTurnComposite = undefined + } + + capture = new DualChannelCapture({ micStream, systemStream, transcriber }) + capture.on("error", (err) => appendLine(`[capture error] ${err.message}`)) + + await transcriber.connect() + await capture.start() + stopBtn.disabled = false + resolveMethodSelect.disabled = true + thresholdRatioInput.disabled = true + hangoverFramesInput.disabled = true + appendLine("Streaming…") +} + +async function stop(): Promise { + stopBtn.disabled = true + await capture?.stop() + await transcriber?.close() + micStream?.getTracks().forEach((t) => t.stop()) + systemStream?.getTracks().forEach((t) => t.stop()) + capture = undefined + transcriber = undefined + micStream = undefined + systemStream = undefined + ;( + globalThis as unknown as { __resetTurnState?: () => void } + ).__resetTurnState?.() + micFill.style.width = "0%" + sysFill.style.width = "0%" + micDb.textContent = "–∞ dB" + sysDb.textContent = "–∞ dB" + micFlag.textContent = "" + sysFlag.textContent = "" + startBtn.disabled = false + resolveMethodSelect.disabled = false + thresholdRatioInput.disabled = false + hangoverFramesInput.disabled = false + appendLine("Stopped.") +} + +startBtn.addEventListener("click", () => { + void start().catch((err) => { + appendLine(`[start error] ${(err as Error).message}`) + startBtn.disabled = false + }) +}) +stopBtn.addEventListener("click", () => { + void stop() +}) diff --git a/samples/streaming-dual-channel-mic-system/package.json b/samples/streaming-dual-channel-mic-system/package.json new file mode 100644 index 0000000..44c2729 --- /dev/null +++ b/samples/streaming-dual-channel-mic-system/package.json @@ -0,0 +1,18 @@ +{ + "name": "streaming-dual-channel-mic-system", + "private": true, + "version": "1.0.0", + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "devDependencies": { + "typescript": "^5.4.5", + "vite": "^5.4.10" + }, + "dependencies": { + "assemblyai": "file:../.." + } +} diff --git a/samples/streaming-dual-channel-mic-system/tsconfig.json b/samples/streaming-dual-channel-mic-system/tsconfig.json new file mode 100644 index 0000000..38d75d5 --- /dev/null +++ b/samples/streaming-dual-channel-mic-system/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ESNext", + "moduleResolution": "Bundler", + "lib": ["ES2022", "DOM", "DOM.Iterable"], + "strict": true, + "skipLibCheck": true, + "isolatedModules": true, + "noEmit": true, + "useDefineForClassFields": true, + "esModuleInterop": true + }, + "include": ["main.ts"] +} diff --git a/src/exports/streaming.ts b/src/exports/streaming.ts index eb145dd..d0db881 100644 --- a/src/exports/streaming.ts +++ b/src/exports/streaming.ts @@ -1,4 +1,11 @@ export * from "../types/asyncapi.generated"; export * from "../types/realtime"; export * from "../types/helpers"; +export * from "../types/streaming/dual-channel"; export * from "../services/realtime/service"; +export * from "../services/streaming/service"; +export * from "../services/streaming/factory"; +export * from "../services/streaming/browser/dual-channel-capture"; +export * from "../services/streaming/energy-vad"; +export * from "../services/streaming/label-mapper"; +export * from "../services/streaming/resampler"; diff --git a/src/services/index.ts b/src/services/index.ts index d0441b4..0c12913 100644 --- a/src/services/index.ts +++ b/src/services/index.ts @@ -8,7 +8,18 @@ import { } from "./realtime"; import { TranscriptService } from "./transcripts"; import { FileService } from "./files"; -import { StreamingTranscriber, StreamingTranscriberFactory } from "./streaming"; +import { + StreamingTranscriber, + StreamingTranscriberFactory, + DualChannelCapture, + EnergyVad, + LinearResampler, + VadTimeline, + attributeTurn, + attributeWord, + rollUpTurnChannel, + float32ToPcm16, +} from "./streaming"; const defaultBaseUrl = "https://api.assemblyai.com"; const defaultStreamingUrl = "https://streaming.assemblyai.com"; @@ -71,4 +82,12 @@ export { TranscriptService, FileService, StreamingTranscriber, + DualChannelCapture, + EnergyVad, + LinearResampler, + VadTimeline, + attributeTurn, + attributeWord, + rollUpTurnChannel, + float32ToPcm16, }; diff --git a/src/services/streaming/browser/dual-channel-capture.ts b/src/services/streaming/browser/dual-channel-capture.ts new file mode 100644 index 0000000..14bad56 --- /dev/null +++ b/src/services/streaming/browser/dual-channel-capture.ts @@ -0,0 +1,177 @@ +import { StreamingTranscriber } from "../service"; +import { BrowserOnlyError } from "../../../types/streaming/dual-channel"; + +export { BrowserOnlyError } from "../../../types/streaming/dual-channel"; +import { + PCM16_ENCODER_PROCESSOR_NAME, + Pcm16EncoderMessage, + pcm16EncoderWorkletSource, +} from "./worklets/pcm16-encoder"; + +const DEFAULT_TARGET_RATE = 16_000; +const DEFAULT_CHUNK_MS = 50; +const MIC_CHANNEL = "mic"; +const SYSTEM_CHANNEL = "system"; + +type ErrorListener = (err: Error) => void; + +export type DualChannelCaptureParams = { + /** Microphone MediaStream. Caller should set `echoCancellation: true` at `getUserMedia` time. */ + micStream: MediaStream; + /** System-audio MediaStream (e.g. `getDisplayMedia({ audio: true })`). */ + systemStream: MediaStream; + /** + * The transcriber to push tagged PCM into. MUST be constructed with + * `channels: [{ name: "mic" }, { name: "system" }]` so the per-channel + * `sendAudio` calls succeed. + */ + transcriber: StreamingTranscriber; + /** + * Target sample rate sent to the transcriber. Defaults to 16000. The + * AudioContext runs at the device's native rate; resampling happens inside + * the encoder worklet (forcing `AudioContext({ sampleRate })` is unreliable + * across browsers). + */ + targetSampleRate?: number; +}; + +/** + * Browser-only adapter that pumps two `MediaStream`s into a `StreamingTranscriber` + * configured for dual-channel mode. Each `MediaStream` runs through its own + * `pcm16-encoder` AudioWorklet (resample to `targetSampleRate`, encode to Int16 + * PCM); each PCM chunk is forwarded via `transcriber.sendAudio(pcm, { channel })`. + * + * All dual-channel orchestration (mixing, VAD, per-word attribution) lives inside + * `StreamingTranscriber` — this class is a pure I/O adapter. Non-browser runtimes + * can replicate its job by pushing tagged PCM into `transcriber.sendAudio` directly. + * + * Caller responsibilities: + * - **Echo cancellation** is set at `getUserMedia` time (`audio: { echoCancellation: true }`). + * - **System-audio capture** is platform-dependent. Chrome's `getDisplayMedia({ audio: true })` + * captures tab audio (and on Windows, full system audio when sharing the whole screen). + * macOS requires a virtual loopback driver (e.g. BlackHole) to expose system audio at all. + * - **Token auth.** Construct the transcriber with `token` — API-key auth is unsupported in browsers. + * - **Stream ownership.** `stop()` tears down the AudioContext but does NOT stop the + * `MediaStreamTrack`s passed in — callers own those. + */ +export class DualChannelCapture { + private readonly params: Required< + Omit + > & { targetSampleRate: number }; + private errorListener?: ErrorListener; + private context?: AudioContext; + private micSource?: MediaStreamAudioSourceNode; + private sysSource?: MediaStreamAudioSourceNode; + private micEncoder?: AudioWorkletNode; + private sysEncoder?: AudioWorkletNode; + private running = false; + + constructor(params: DualChannelCaptureParams) { + if (typeof globalThis.AudioContext === "undefined") { + throw new BrowserOnlyError(); + } + this.params = { + micStream: params.micStream, + systemStream: params.systemStream, + transcriber: params.transcriber, + targetSampleRate: params.targetSampleRate ?? DEFAULT_TARGET_RATE, + }; + } + + on(event: "error", listener: ErrorListener): void { + if (event === "error") this.errorListener = listener; + } + + /** + * Wire the capture pipeline and start pumping tagged PCM into the transcriber. + * The transcriber must already be connected. Returns once the worklet is + * registered and the audio graph is live. + */ + async start(): Promise { + if (this.running) { + throw new Error("DualChannelCapture already started"); + } + this.context = new AudioContext(); + + const blob = new Blob([pcm16EncoderWorkletSource], { + type: "application/javascript", + }); + const url = URL.createObjectURL(blob); + try { + await this.context.audioWorklet.addModule(url); + } finally { + URL.revokeObjectURL(url); + } + + this.micSource = this.context.createMediaStreamSource( + this.params.micStream, + ); + this.sysSource = this.context.createMediaStreamSource( + this.params.systemStream, + ); + + this.micEncoder = this.makeEncoder(MIC_CHANNEL); + this.sysEncoder = this.makeEncoder(SYSTEM_CHANNEL); + this.micSource.connect(this.micEncoder); + this.sysSource.connect(this.sysEncoder); + + this.running = true; + } + + private makeEncoder(channel: string): AudioWorkletNode { + const node = new AudioWorkletNode( + this.context!, + PCM16_ENCODER_PROCESSOR_NAME, + { + numberOfInputs: 1, + numberOfOutputs: 0, + channelCount: 1, + channelCountMode: "explicit", + channelInterpretation: "speakers", + processorOptions: { + targetRate: this.params.targetSampleRate, + chunkMs: DEFAULT_CHUNK_MS, + }, + }, + ); + node.port.onmessage = (e: MessageEvent) => { + try { + this.params.transcriber.sendAudio(e.data.pcm, { channel }); + } catch (err) { + this.errorListener?.(err as Error); + } + }; + return node; + } + + /** + * Tear down internal nodes and close the AudioContext. Does NOT stop the + * caller-provided MediaStream tracks — they remain available for preview UI, + * recording, etc. Idempotent. + */ + async stop(): Promise { + if (!this.running) return; + this.running = false; + + try { + this.micEncoder?.port.close(); + this.sysEncoder?.port.close(); + this.micEncoder?.disconnect(); + this.sysEncoder?.disconnect(); + this.micSource?.disconnect(); + this.sysSource?.disconnect(); + } catch { + // Disconnecting already-disconnected nodes throws in some browsers; ignore. + } + + if (this.context && this.context.state !== "closed") { + await this.context.close(); + } + + this.context = undefined; + this.micSource = undefined; + this.sysSource = undefined; + this.micEncoder = undefined; + this.sysEncoder = undefined; + } +} diff --git a/src/services/streaming/browser/worklets/pcm16-encoder.ts b/src/services/streaming/browser/worklets/pcm16-encoder.ts new file mode 100644 index 0000000..3ed4294 --- /dev/null +++ b/src/services/streaming/browser/worklets/pcm16-encoder.ts @@ -0,0 +1,70 @@ +/** + * AudioWorklet processor that ingests mono Float32 audio at the AudioContext's + * native sample rate, resamples to `targetRate` (linear interpolation, stateful + * across `process()` calls), packs to little-endian Int16 PCM, and posts + * fixed-size chunks via `port.postMessage` with a running `samplesSent` counter. + * + * `samplesSent` is in **target-rate samples**, so the main thread can derive a + * stream-relative timestamp = `samplesSent / targetRate * 1000` (ms) — the same + * frame AAI uses for `StreamingWord.start` / `.end`. + * + * Defined as a string so it can be registered via a Blob URL — the SDK ships as + * a single ESM file, so a separate `.js` worklet asset isn't viable. + */ +export const pcm16EncoderWorkletSource = ` +class Pcm16EncoderProcessor extends AudioWorkletProcessor { + constructor(options) { + super(); + const opts = (options && options.processorOptions) || {}; + this.targetRate = opts.targetRate || 16000; + this.chunkMs = opts.chunkMs || 50; + this.ratio = sampleRate / this.targetRate; + this.chunkSize = Math.round(this.targetRate * this.chunkMs / 1000); + this.buffer = new Int16Array(this.chunkSize); + this.bufferIdx = 0; + this.samplesSent = 0; + this.lastSample = 0; + this.fractional = 0; + } + + process(inputs) { + const input = inputs[0]; + if (!input || input.length === 0 || !input[0] || input[0].length === 0) { + return true; + } + const mono = input[0]; + let pos = this.fractional; + while (pos < mono.length) { + const i = Math.floor(pos); + const frac = pos - i; + const a = i === 0 ? this.lastSample : mono[i - 1]; + const b = mono[i]; + const sample = a + (b - a) * frac; + const clamped = sample < -1 ? -1 : sample > 1 ? 1 : sample; + this.buffer[this.bufferIdx++] = clamped < 0 ? clamped * 0x8000 : clamped * 0x7fff; + if (this.bufferIdx === this.chunkSize) { + const out = new Int16Array(this.chunkSize); + out.set(this.buffer); + this.samplesSent += this.chunkSize; + this.port.postMessage( + { pcm: out.buffer, samplesSent: this.samplesSent }, + [out.buffer], + ); + this.bufferIdx = 0; + } + pos += this.ratio; + } + this.lastSample = mono[mono.length - 1]; + this.fractional = pos - mono.length; + return true; + } +} +registerProcessor("aai-pcm16-encoder", Pcm16EncoderProcessor); +`; + +export const PCM16_ENCODER_PROCESSOR_NAME = "aai-pcm16-encoder"; + +export type Pcm16EncoderMessage = { + pcm: ArrayBuffer; + samplesSent: number; +}; diff --git a/src/services/streaming/energy-vad.ts b/src/services/streaming/energy-vad.ts new file mode 100644 index 0000000..44b9fe9 --- /dev/null +++ b/src/services/streaming/energy-vad.ts @@ -0,0 +1,75 @@ +import { + VadDetector, + VadDetectorResult, +} from "../../types/streaming/dual-channel"; + +export type EnergyVadParams = { + /** Threshold = noiseFloor * thresholdRatio. Default 3.0 (~ +9.5 dB above noise). */ + thresholdRatio?: number; + /** EMA smoothing for the noise-floor estimate when frame is non-speech. Default 0.05. */ + noiseFloorAlpha?: number; + /** Hangover in frames: stay "active" this many frames after the last speech frame. Default 10 (\~200 ms at 20 ms frames). */ + hangoverFrames?: number; + /** Initial noise floor estimate. Default 1e-4. Adaptive after the first non-speech frame. */ + initialNoiseFloor?: number; +}; + +/** + * Energy-based VAD with adaptive noise-floor tracking and hangover. Pure JS, + * no dependencies. Suitable for the "which physical channel is speaking" task + * because the channels are already physically separated at capture — the harder + * problem (speech vs. non-speech in the wild) is one a customer can swap in a + * DNN VAD for via the `createVad` parameter. + * + * Tuning notes: + * - thresholdRatio below 2 will treat anything above noise as speech (too sensitive). + * - thresholdRatio above 6 will miss quiet utterance onsets/offsets. + * - noiseFloorAlpha above 0.1 makes the floor track quickly (good for non-stationary + * background) but risks slowly adapting *up* to a sustained low voice. + */ +export class EnergyVad implements VadDetector { + private readonly thresholdRatio: number; + private readonly noiseFloorAlpha: number; + private readonly hangoverFrames: number; + private readonly initialNoiseFloor: number; + private noiseFloor: number; + private hangoverRemaining = 0; + + constructor(params: EnergyVadParams = {}) { + this.thresholdRatio = params.thresholdRatio ?? 3.0; + this.noiseFloorAlpha = params.noiseFloorAlpha ?? 0.05; + this.hangoverFrames = params.hangoverFrames ?? 10; + this.initialNoiseFloor = params.initialNoiseFloor ?? 1e-4; + this.noiseFloor = this.initialNoiseFloor; + } + + process(frame: Float32Array): VadDetectorResult { + let sumSq = 0; + for (let i = 0; i < frame.length; i++) { + sumSq += frame[i] * frame[i]; + } + const rms = frame.length > 0 ? Math.sqrt(sumSq / frame.length) : 0; + + const threshold = this.noiseFloor * this.thresholdRatio; + let active = rms > threshold; + + if (active) { + this.hangoverRemaining = this.hangoverFrames; + } else if (this.hangoverRemaining > 0) { + this.hangoverRemaining--; + active = true; + // While in hangover, do not update noise floor — RMS may still reflect tail energy. + } else { + this.noiseFloor = + this.noiseFloor * (1 - this.noiseFloorAlpha) + + rms * this.noiseFloorAlpha; + } + + return { active, energy: rms }; + } + + reset(): void { + this.noiseFloor = this.initialNoiseFloor; + this.hangoverRemaining = 0; + } +} diff --git a/src/services/streaming/index.ts b/src/services/streaming/index.ts index 6a6ee0e..bfac48e 100644 --- a/src/services/streaming/index.ts +++ b/src/services/streaming/index.ts @@ -1,2 +1,6 @@ export * from "./factory"; export * from "./service"; +export * from "./browser/dual-channel-capture"; +export * from "./energy-vad"; +export * from "./label-mapper"; +export * from "./resampler"; diff --git a/src/services/streaming/label-mapper.ts b/src/services/streaming/label-mapper.ts new file mode 100644 index 0000000..bea85a7 --- /dev/null +++ b/src/services/streaming/label-mapper.ts @@ -0,0 +1,128 @@ +import { StreamingWord, TurnEvent } from "../../types/streaming"; +import { Channel, VadFrame } from "../../types/streaming/dual-channel"; + +export type LabelMapperParams = { + /** Per-word energy ratio above which a channel is declared dominant. */ + dominanceRatio: number; +}; + +/** + * Append-only ring buffer of VAD frames in stream-relative ms order. + * `pushFrame` is O(1) amortized; `framesInWindow` is O(n) over kept frames, + * which is fine for the per-word lookups we do (a 30 s window at 50 frames/s + * per channel × 2 channels = 3000 entries, scanned once per word). + * + * Runtime-agnostic — no DOM or Web Audio dependencies. + */ +export class VadTimeline { + private frames: VadFrame[] = []; + private head = 0; + + constructor(private readonly windowMs: number) {} + + pushFrame(frame: VadFrame): void { + this.frames.push(frame); + const cutoff = frame.ts - this.windowMs; + while ( + this.head < this.frames.length && + this.frames[this.head].ts < cutoff + ) { + this.head++; + } + if (this.head > 1024 && this.head * 2 > this.frames.length) { + this.frames = this.frames.slice(this.head); + this.head = 0; + } + } + + framesInWindow(startMs: number, endMs: number): VadFrame[] { + const out: VadFrame[] = []; + for (let i = this.head; i < this.frames.length; i++) { + const f = this.frames[i]; + if (f.ts < startMs) continue; + if (f.ts > endMs) break; + out.push(f); + } + return out; + } + + clear(): void { + this.frames = []; + this.head = 0; + } +} + +/** + * Sum per-channel active RMS over a window. Returns a Map from channel name + * to total score. Channels with zero score are omitted. + */ +function scoreChannels(frames: VadFrame[]): Map { + const scores = new Map(); + for (const f of frames) { + if (!f.active) continue; + scores.set(f.channel, (scores.get(f.channel) ?? 0) + f.rms); + } + return scores; +} + +/** + * Decide which channel was dominant during a word's `[start, end]` window. + * + * - If no channel has any active VAD energy → `"unknown"`. + * - If the top channel beats the runner-up by at least `dominanceRatio` → top channel. + * - Else: top channel wins on absolute score; exact ties → `"unknown"`. + */ +export function attributeWord( + word: StreamingWord, + timeline: VadTimeline, + params: LabelMapperParams, +): Channel { + const scores = scoreChannels(timeline.framesInWindow(word.start, word.end)); + if (scores.size === 0) return "unknown"; + const sorted = [...scores.entries()].sort((a, b) => b[1] - a[1]); + if (sorted.length === 1) return sorted[0][0]; + const [topName, topScore] = sorted[0]; + const [runnerName, runnerScore] = sorted[1]; + if (topScore >= params.dominanceRatio * runnerScore) return topName; + if (topScore > runnerScore) return topName; + if (runnerScore > topScore) return runnerName; + return "unknown"; +} + +/** + * Duration-weighted majority of word channels. `"unknown"` if there are no + * words, every word resolved to `"unknown"`, or two channels tie exactly. + */ +export function rollUpTurnChannel(words: StreamingWord[]): Channel { + const totals = new Map(); + for (const w of words) { + if (!w.channel || w.channel === "unknown") continue; + const dur = Math.max(0, w.end - w.start); + totals.set(w.channel, (totals.get(w.channel) ?? 0) + dur); + } + if (totals.size === 0) return "unknown"; + const sorted = [...totals.entries()].sort((a, b) => b[1] - a[1]); + if (sorted.length === 1) return sorted[0][0]; + const [topName, topMs] = sorted[0]; + const [, runnerMs] = sorted[1]; + if (topMs === runnerMs) return "unknown"; + return topName; +} + +/** + * Mutate `turn` in place: write `turn.words[i].channel` for every word and set + * `turn.channel` to the duration-weighted rollup. + * + * Returns `void` because the transcriber owns the `TurnEvent` ref and forwards + * the same object to the customer listener — no need to allocate a copy. + */ +export function attributeTurn( + turn: TurnEvent, + timeline: VadTimeline, + params: LabelMapperParams, +): void { + for (const w of turn.words) { + w.channel = attributeWord(w, timeline, params); + } + turn.channel = rollUpTurnChannel(turn.words); +} diff --git a/src/services/streaming/resampler.ts b/src/services/streaming/resampler.ts new file mode 100644 index 0000000..f9e993f --- /dev/null +++ b/src/services/streaming/resampler.ts @@ -0,0 +1,69 @@ +/** + * Linear-interpolation resampler for streaming Float32 audio. Stateful across + * `process()` calls so chunk boundaries don't introduce phase discontinuities: + * the last input sample and a fractional read position are carried over. + * + * Linear interpolation is good enough for ASR ingest — the downstream + * StreamingTranscriber band-limits at the target rate anyway, and a polyphase + * filter would be overkill in the AudioWorklet hot path. If a customer needs + * higher quality they can supply their own VadDetector + bypass the encoder. + */ +export class LinearResampler { + private readonly ratio: number; + private lastSample = 0; + private fractional = 0; + + constructor( + private readonly sourceRate: number, + private readonly targetRate: number, + ) { + if (sourceRate <= 0 || targetRate <= 0) { + throw new Error("sourceRate and targetRate must be positive"); + } + this.ratio = sourceRate / targetRate; + } + + process(input: Float32Array): Float32Array { + if (this.sourceRate === this.targetRate) { + return input; + } + + // Worst-case output length; we'll slice to actual. + const out = new Float32Array(Math.ceil(input.length / this.ratio) + 1); + let outIdx = 0; + let pos = this.fractional; + + while (pos < input.length) { + const i = Math.floor(pos); + const frac = pos - i; + const a = i === 0 ? this.lastSample : input[i - 1]; + const b = input[i]; + out[outIdx++] = a + (b - a) * frac; + pos += this.ratio; + } + + this.lastSample = input[input.length - 1] ?? this.lastSample; + this.fractional = pos - input.length; + return out.subarray(0, outIdx); + } + + reset(): void { + this.lastSample = 0; + this.fractional = 0; + } +} + +/** Convert Float32 PCM (-1..1) to little-endian Int16 PCM. */ +export function float32ToPcm16(input: Float32Array): ArrayBuffer { + const out = new ArrayBuffer(input.length * 2); + const view = new DataView(out); + for (let i = 0; i < input.length; i++) { + const clamped = Math.max(-1, Math.min(1, input[i])); + view.setInt16( + i * 2, + clamped < 0 ? clamped * 0x8000 : clamped * 0x7fff, + true, + ); + } + return out; +} diff --git a/src/services/streaming/service.ts b/src/services/streaming/service.ts index 636c561..7930d47 100644 --- a/src/services/streaming/service.ts +++ b/src/services/streaming/service.ts @@ -6,6 +6,7 @@ import { import { ErrorEvent, MessageEvent, CloseEvent } from "ws"; import { conditions } from "#conditions"; import { + ChannelAttributionParams, StreamingEvents, StreamingListeners, StreamingTranscriberParams, @@ -18,12 +19,62 @@ import { StreamingForceEndpoint, WarningEvent, } from "../.."; +import type { VadDetector, VadFrame } from "../../types/streaming/dual-channel"; +import { EnergyVad } from "./energy-vad"; +import { attributeTurn, rollUpTurnChannel, VadTimeline } from "./label-mapper"; import { StreamingError, StreamingErrorMessages } from "../../utils/errors"; import { StreamingErrorTypeCodes } from "../../utils/errors/streaming"; +/** + * Options for `sendAudio`. In dual-channel mode (when `channels` is configured + * on the transcriber), `channel` is required and must match one of the declared + * channel names; in single-channel mode it is ignored. + */ +export type SendAudioOptions = { + channel?: string; +}; + +/** + * View any `AudioData` (ArrayBuffer / ArrayBufferView / typed array) as a + * little-endian Int16 sample sequence without copying. Callers must guarantee + * the underlying byte length is even. + */ +function toInt16View(audio: AudioData): Int16Array { + // AudioData is ArrayBufferLike per the public type, but in practice callers + // pass ArrayBuffer or a typed-array view. Handle both without copying. + if (audio instanceof Int16Array) return audio; + if (ArrayBuffer.isView(audio)) { + const view = audio as ArrayBufferView; + return new Int16Array( + view.buffer, + view.byteOffset, + Math.floor(view.byteLength / 2), + ); + } + return new Int16Array(audio as ArrayBuffer); +} + const defaultStreamingUrl = "wss://streaming.assemblyai.com/v3/ws"; const terminateSessionMessage = `{"type":"Terminate"}`; +/** + * Per-send chunk cap in milliseconds for the dual-channel mixer. The streaming + * server rejects audio messages longer than 1000 ms (`Input Duration Error`). + * If a backlog accumulates (e.g. when a browser tab is backgrounded and + * `setInterval` is throttled to ~1 Hz), `flushMix` loops and emits multiple + * sends each ≤ this cap until the buffers drain. + */ +const MAX_CHUNK_MS = 200; + +/** + * Per-send minimum chunk size in milliseconds. The streaming server also + * rejects audio messages shorter than 50 ms with the same + * `Input Duration Error`, so the mixer waits until both per-channel buffers + * have at least this much accumulated before emitting. Final-flush (close + * path) bypasses this floor so the trailing partial buffer still gets sent. + */ +const MIN_CHUNK_MS = 50; + type BufferLike = | string | Buffer @@ -51,6 +102,25 @@ export class StreamingTranscriber { private listeners: StreamingListeners = {}; private sessionTerminatedResolve?: () => void; + // Dual-channel mode state (allocated only when params.channels is set). + private isDualChannel = false; + private channelNames?: string[]; + private channelBuffers?: Map; + private channelSamplesReceived?: Map; + private channelVadFloatBuffers?: Map; + private channelVadBufferIdx?: Map; + private channelVads?: Map; + private timeline?: VadTimeline; + private flushTimer?: ReturnType; + private attributionParams?: Required; + private vadFrameSamples = 0; + private minChunkSamples = 0; + private maxChunkSamples = 0; + // For resolveUnknownChannelsMethod === "speaker-history": per-speaker_label + // cumulative active-VAD RMS per channel. Allocated only when that method is + // configured. + private speakerHistory?: Map>; + constructor(params: StreamingTranscriberParams) { this.params = { ...params, @@ -63,6 +133,58 @@ export class StreamingTranscriber { if (!(this.token || this.apiKey)) { throw new Error("API key or temporary token is required."); } + + if (params.channels) { + if (params.channels.length !== 2) { + throw new Error( + "StreamingTranscriber.channels must have exactly 2 entries.", + ); + } + const names = params.channels.map((c) => c.name); + if (new Set(names).size !== names.length) { + throw new Error("StreamingTranscriber.channels names must be unique."); + } + this.isDualChannel = true; + this.channelNames = names; + const att = params.channelAttribution ?? {}; + this.attributionParams = { + dominanceRatio: att.dominanceRatio ?? 4, + timelineWindowMs: att.timelineWindowMs ?? 30_000, + createVad: att.createVad ?? (() => new EnergyVad()), + flushIntervalMs: att.flushIntervalMs ?? 50, + resolveUnknownChannelsMethod: + att.resolveUnknownChannelsMethod ?? "window", + resolutionWindowWords: att.resolutionWindowWords ?? 2, + speakerHistoryMinRmsEvidence: att.speakerHistoryMinRmsEvidence ?? 0.5, + speakerHistoryDominanceRatio: att.speakerHistoryDominanceRatio ?? 3, + }; + if ( + this.attributionParams.resolveUnknownChannelsMethod === + "speaker-history" + ) { + this.speakerHistory = new Map(); + } + // 20 ms VAD frames at the transcriber's target sample rate. + this.vadFrameSamples = Math.max(1, Math.round(params.sampleRate * 0.02)); + this.minChunkSamples = Math.max( + 1, + Math.round(params.sampleRate * (MIN_CHUNK_MS / 1000)), + ); + this.maxChunkSamples = Math.max( + this.minChunkSamples, + Math.round(params.sampleRate * (MAX_CHUNK_MS / 1000)), + ); + this.channelBuffers = new Map(names.map((n) => [n, [] as number[]])); + this.channelSamplesReceived = new Map(names.map((n) => [n, 0])); + this.channelVadFloatBuffers = new Map( + names.map((n) => [n, new Float32Array(this.vadFrameSamples)]), + ); + this.channelVadBufferIdx = new Map(names.map((n) => [n, 0])); + this.channelVads = new Map( + names.map((n) => [n, this.attributionParams!.createVad(n)]), + ); + this.timeline = new VadTimeline(this.attributionParams.timelineWindowMs); + } } private connectionUrl(): URL { @@ -283,6 +405,7 @@ export class StreamingTranscriber { listener: (event: LLMGatewayResponseEvent) => void, ): void; on(event: "warning", listener: (event: WarningEvent) => void): void; + on(event: "vad", listener: (event: VadFrame) => void): void; on(event: "error", listener: (error: Error) => void): void; on(event: "close", listener: (code: number, reason: string) => void): void; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -323,6 +446,13 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c reason = StreamingErrorMessages[code as StreamingErrorTypeCodes]; } } + // Stop the flush timer when the socket is gone (server-initiated close, + // network drop, etc.) — otherwise subsequent ticks call send() on a + // closed socket and spam the error listener. + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = undefined; + } this.listeners.close?.(code, reason); }; @@ -351,6 +481,22 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c break; } case "Turn": { + if (this.isDualChannel && this.timeline && this.attributionParams) { + attributeTurn(message, this.timeline, { + dominanceRatio: this.attributionParams.dominanceRatio, + }); + switch (this.attributionParams.resolveUnknownChannelsMethod) { + case "window": + this.resolveUnknownChannelsByWindow(message); + break; + case "speaker-history": + this.resolveUnknownChannelsBySpeakerHistory(message); + break; + case "none": + // Leave "unknown" words as-is. + break; + } + } this.listeners.turn?.(message); break; } @@ -379,6 +525,11 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c }); } + /** + * Returns a WritableStream that pumps PCM chunks into `sendAudio`. Single-channel + * only — in dual-channel mode use `sendAudio(pcm, { channel })` directly, since + * `WritableStream` has no place to carry a channel tag. + */ stream(): WritableStream { return new WritableStream({ write: (chunk: AudioData) => { @@ -387,8 +538,231 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c }); } - sendAudio(audio: AudioData) { - this.send(audio); + /** + * Send PCM audio. + * + * In single-channel mode, `audio` is forwarded directly to the WebSocket and + * `options` is ignored. + * + * In dual-channel mode (when `channels` is configured), `options.channel` is + * REQUIRED and must match one of the declared channel names. Per-channel PCM is + * fed into that channel's VAD, accumulated into a per-channel ring buffer, and + * a scheduled flush (`channelAttribution.flushIntervalMs`, default 50ms) mixes + * the buffers into mono before sending to the WebSocket. + */ + sendAudio(audio: AudioData, options?: SendAudioOptions) { + if (!this.isDualChannel) { + this.send(audio); + return; + } + if (!options?.channel) { + throw new Error( + "StreamingTranscriber is in dual-channel mode; sendAudio requires { channel }.", + ); + } + if (!this.channelNames!.includes(options.channel)) { + throw new Error( + `Unknown channel "${options.channel}"; declared channels: ${this.channelNames!.join(", ")}.`, + ); + } + this.ingestChannelAudio(options.channel, audio); + } + + private ingestChannelAudio(name: string, audio: AudioData) { + const samples = toInt16View(audio); + const buf = this.channelBuffers!.get(name)!; + const vadBuf = this.channelVadFloatBuffers!.get(name)!; + let vadIdx = this.channelVadBufferIdx!.get(name)!; + let received = this.channelSamplesReceived!.get(name)!; + const vad = this.channelVads!.get(name)!; + const sampleRate = this.params.sampleRate; + const frameSize = this.vadFrameSamples; + + for (let i = 0; i < samples.length; i++) { + const s = samples[i]; + buf.push(s); + vadBuf[vadIdx++] = s / 0x8000; + received++; + if (vadIdx === frameSize) { + const result = vad.process(vadBuf); + const frame: VadFrame = { + ts: (received / sampleRate) * 1000, + channel: name, + active: result.active, + rms: result.energy, + }; + this.timeline!.pushFrame(frame); + this.listeners.vad?.(frame); + vadIdx = 0; + } + } + + this.channelVadBufferIdx!.set(name, vadIdx); + this.channelSamplesReceived!.set(name, received); + + if (!this.flushTimer) this.startFlushTimer(); + } + + private startFlushTimer() { + this.flushTimer = setInterval( + () => this.flushMix(), + this.attributionParams!.flushIntervalMs, + ); + } + + private flushMix(force = false) { + if (!this.channelNames || !this.channelBuffers) return; + const bufs = this.channelNames.map((n) => this.channelBuffers!.get(n)!); + const divisor = bufs.length; + // Loop so a backlog (e.g. accumulated while a browser tab was throttled in + // the background) drains as multiple sends, each capped at MAX_CHUNK_MS. + // Without the cap a single message could exceed the server's 1000 ms input + // duration limit and be rejected with code 3007. + for (;;) { + let mixLen = Infinity; + for (const b of bufs) if (b.length < mixLen) mixLen = b.length; + if (!Number.isFinite(mixLen) || mixLen === 0) return; + // The streaming server rejects audio messages shorter than 50 ms with + // `Input Duration Error`. Wait until both per-channel buffers have at + // least minChunkSamples worth queued before emitting. The `force` path + // (final flush on close) bypasses this so the trailing partial buffer + // still gets through. + if (!force && mixLen < this.minChunkSamples) return; + if (mixLen > this.maxChunkSamples) mixLen = this.maxChunkSamples; + const out = new Int16Array(mixLen); + for (let i = 0; i < mixLen; i++) { + let sum = 0; + for (let c = 0; c < divisor; c++) sum += bufs[c][i]; + const avg = Math.round(sum / divisor); + out[i] = avg < -32768 ? -32768 : avg > 32767 ? 32767 : avg; + } + for (const b of bufs) b.splice(0, mixLen); + try { + this.send(out.buffer); + } catch (err) { + this.listeners.error?.(err as Error); + return; + } + } + } + + /** + * Fill in words whose per-word VAD attribution was `"unknown"` by looking + * at the dominant non-`"unknown"` channel among ±N neighbors in the same + * turn. Words with no non-`"unknown"` neighbors stay `"unknown"`. Confident + * per-word VAD decisions are never modified. + * + * Local temporal heuristic — ignores `speaker_label`, so it works even when + * AAI's diarization re-uses the same label for two physically distinct + * voices. Each resolved word gets `channelResolved: true` so downstream + * renderers can distinguish inferred channels from directly-measured ones. + */ + private resolveUnknownChannelsByWindow(turn: TurnEvent): void { + if (!this.attributionParams) return; + const window = this.attributionParams.resolutionWindowWords; + const words = turn.words; + let mutated = false; + + for (let i = 0; i < words.length; i++) { + if (words[i].channel !== "unknown") continue; + const tally = new Map(); + const lo = Math.max(0, i - window); + const hi = Math.min(words.length - 1, i + window); + for (let j = lo; j <= hi; j++) { + if (j === i) continue; + const ch = words[j].channel; + if (!ch || ch === "unknown") continue; + tally.set(ch, (tally.get(ch) ?? 0) + 1); + } + if (tally.size === 0) continue; + + // Pick the dominant neighbor channel. Ties → leave `"unknown"` (rare; + // would require an equal count of mic and system neighbors). + let top: string | undefined; + let topCount = 0; + let tied = false; + for (const [name, count] of tally) { + if (count > topCount) { + top = name; + topCount = count; + tied = false; + } else if (count === topCount) { + tied = true; + } + } + if (top && !tied) { + words[i].channel = top; + words[i].channelResolved = true; + mutated = true; + } + } + + // Recompute the rollup only if any per-word channel changed. + if (mutated) turn.channel = rollUpTurnChannel(words); + } + + /** + * Fill `"unknown"` words by looking up the speaker's session-wide channel + * evidence. For each `speaker_label`, sums active VAD frame RMS per channel + * across every word the speaker has uttered to date. A speaker is + * "resolvable" if their total evidence clears + * `speakerHistoryMinRmsEvidence` and their top channel exceeds the + * runner-up by `speakerHistoryDominanceRatio`. + * + * Only touches `"unknown"` words. Confident per-word VAD decisions are + * never modified. `speaker_label` is never modified. + */ + private resolveUnknownChannelsBySpeakerHistory(turn: TurnEvent): void { + if (!this.timeline || !this.attributionParams || !this.speakerHistory) + return; + const minEvidence = this.attributionParams.speakerHistoryMinRmsEvidence; + const dominanceRatio = this.attributionParams.speakerHistoryDominanceRatio; + + // 1. Accumulate evidence from this turn's words. + for (const w of turn.words) { + if (!w.speaker) continue; + const frames = this.timeline.framesInWindow(w.start, w.end); + let entry = this.speakerHistory.get(w.speaker); + if (!entry) { + entry = new Map(); + this.speakerHistory.set(w.speaker, entry); + } + for (const f of frames) { + if (!f.active) continue; + entry.set(f.channel, (entry.get(f.channel) ?? 0) + f.rms); + } + } + + // 2. Fill unknown words whose speakers have dominant evidence. + let mutated = false; + for (const w of turn.words) { + if (w.channel !== "unknown" || !w.speaker) continue; + const entry = this.speakerHistory.get(w.speaker); + if (!entry || entry.size === 0) continue; + let total = 0; + let topName: string | undefined; + let topScore = 0; + let runnerScore = 0; + for (const [name, score] of entry) { + total += score; + if (score > topScore) { + runnerScore = topScore; + topScore = score; + topName = name; + } else if (score > runnerScore) { + runnerScore = score; + } + } + if (total < minEvidence) continue; + if (runnerScore > 0 && topScore < dominanceRatio * runnerScore) continue; + if (topName) { + w.channel = topName; + w.channelResolved = true; + mutated = true; + } + } + + if (mutated) turn.channel = rollUpTurnChannel(turn.words); } /** @@ -440,6 +814,15 @@ Learn more at https://github.com/AssemblyAI/assemblyai-node-sdk/blob/main/docs/c } async close(waitForSessionTermination = true) { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = undefined; + // Best-effort: drain any final partial mix so the server gets the tail. + // Bypass the 50ms floor here since this is the last flush; if the tail + // is <50ms the server will reject that single message, but we'd lose + // the audio either way. + this.flushMix(true); + } if (this.socket) { if (this.socket.readyState === this.socket.OPEN) { if (waitForSessionTermination) { diff --git a/src/types/streaming/dual-channel.ts b/src/types/streaming/dual-channel.ts new file mode 100644 index 0000000..a63e0e6 --- /dev/null +++ b/src/types/streaming/dual-channel.ts @@ -0,0 +1,57 @@ +/** + * Physical input channel that a word/turn was attributed to. + * - A channel name declared in `StreamingTranscriberParams.channels` (e.g. `"mic"`, `"system"`). + * - `"unknown"`: no channel was clearly dominant during the word's time window (silent + * or all channels evenly active under our threshold). + * + * This is independent of AssemblyAI's diarization `speaker_label` / `words[i].speaker`, + * which identifies voices by acoustic characteristics. A given speaker_label can map + * to any physical channel; the two dimensions can disagree. + */ +export type Channel = string | "unknown"; + +/** + * Per-channel, per-frame VAD observation emitted by `StreamingTranscriber` when running + * in dual-channel mode. `ts` is stream-relative milliseconds, derived from the + * per-channel sample counter — the same reference frame as `StreamingWord.start` / + * `.end`, so per-word lookups need no conversion. + */ +export type VadFrame = { + ts: number; + channel: string; + active: boolean; + rms: number; +}; + +export type VadDetectorResult = { + active: boolean; + energy: number; +}; + +/** + * Pluggable per-channel voice-activity detector. The default `EnergyVad` is energy-based + * with an adaptive noise-floor threshold; callers can drop in a DNN-backed detector + * (e.g. Silero via `@ricky0123/vad-web`) for noisier environments. + * + * A separate `VadDetector` instance is held per channel; do not assume cross-channel + * state. Frames are fixed-size at the transcriber's target sample rate. + */ +export interface VadDetector { + process(frame: Float32Array): VadDetectorResult; + reset(): void; +} + +/** + * Thrown when `DualChannelCapture` is constructed in a non-browser environment + * (no `globalThis.AudioContext`). The helper is intentionally surfaced from the + * main entrypoint so the import path is uniform across runtimes; the runtime + * guard moves to construction time. + */ +export class BrowserOnlyError extends Error { + constructor( + message = "DualChannelCapture requires a browser environment (AudioContext is undefined).", + ) { + super(message); + this.name = "BrowserOnlyError"; + } +} diff --git a/src/types/streaming/index.ts b/src/types/streaming/index.ts index 3f5f2e9..0eb8731 100644 --- a/src/types/streaming/index.ts +++ b/src/types/streaming/index.ts @@ -1,4 +1,65 @@ import { AudioEncoding } from ".."; +import type { Channel, VadDetector, VadFrame } from "./dual-channel"; + +export * from "./dual-channel"; + +/** + * Per-channel attribution tuning for dual-channel mode. All fields optional; + * ignored when `StreamingTranscriberParams.channels` is not set. + */ +export type ChannelAttributionParams = { + /** Energy ratio above which a channel is declared dominant for a word. Default 4. */ + dominanceRatio?: number; + /** Rolling VAD timeline window in ms. Default 30_000. */ + timelineWindowMs?: number; + /** + * Factory for the per-channel VAD detector. Called once per declared channel + * at transcriber construction time. The channel name is passed so factories + * that wrap higher-level VAD libraries (which manage their own audio source) + * can map each `VadDetector` instance to its corresponding channel. + */ + createVad?: (channelName: string) => VadDetector; + /** Mix flush interval in ms — how often per-channel buffers are summed and sent. Default 50. */ + flushIntervalMs?: number; + /** + * Strategy used to fill words whose per-word VAD attribution resolved to + * `"unknown"`. Confident per-word VAD decisions (`"mic"` / `"system"`) are + * never modified by any strategy. + * + * - `"window"` (default): look at the dominant non-`"unknown"` channel + * among ±`resolutionWindowWords` neighboring words in the same turn. + * Ignores `speaker_label`, so it works even when AAI re-uses a label for + * two physically distinct voices. + * - `"speaker-history"`: accumulate per-`speaker_label` per-channel active + * VAD energy across the session, then fill `"unknown"` words with the + * speaker's dominant channel when it clears + * `speakerHistoryMinRmsEvidence` and beats runner-up by + * `speakerHistoryDominanceRatio`. Robust for stable speaker labels but + * does nothing when a speaker has split evidence. + * - `"none"`: disable resolution; `"unknown"` words remain `"unknown"` in + * the output. + */ + resolveUnknownChannelsMethod?: "none" | "window" | "speaker-history"; + /** + * Half-window (in words) on each side of an `"unknown"` word for the + * `"window"` method. Default 2 — so the full window is up to 5 words + * (2 before + the unknown + 2 after). + */ + resolutionWindowWords?: number; + /** + * Minimum cumulative active-RMS evidence (sum across all the speaker's + * frames to date) before a speaker can be resolved via the + * `"speaker-history"` method. Default 0.5 — roughly a few seconds of + * sustained speech. + */ + speakerHistoryMinRmsEvidence?: number; + /** + * For the `"speaker-history"` method, the top channel's evidence must + * exceed the runner-up's by at least this factor for the speaker to be + * considered pinned to that channel. Default 3. + */ + speakerHistoryDominanceRatio?: number; +}; export type LLMGatewayMessage = { role: string; @@ -50,6 +111,33 @@ export type StreamingTranscriberParams = { webhookUrl?: string; webhookAuthHeaderName?: string; webhookAuthHeaderValue?: string; + /** + * Enable dual-channel (or N-channel) mode. Presence of `channels` switches the + * transcriber into channel-tagged mode: `sendAudio(audio, { channel })` is required, + * per-channel VAD runs on the raw PCM, the streams are mixed to mono before being + * sent to the server, and emitted `TurnEvent`s are enriched with `channel` and + * per-word `channel` attribution. + * + * Must contain exactly 2 entries with unique names. The names are echoed back in + * `TurnEvent.channel` / `words[i].channel`. + * + * **Acoustic-leak caveat.** Per-word channel attribution uses energy-based + * VAD on each channel. If your capture setup lets one channel's audio bleed + * into another at similar amplitude — typically system audio playing + * through speakers and being picked up by an open mic — attribution can + * misfire (mic-tagged words that were actually system). Transcription + * quality is unaffected; only the `channel` field is. To preserve + * attribution in speaker-leak setups, apply echo cancellation at capture + * before feeding audio to the SDK. In browsers, that's + * `getUserMedia({ audio: { echoCancellation: true } })`. On macOS native, + * `AVAudioEngine.setVoiceProcessingEnabled(true)` on the input node. If + * platform-level AEC isn't available, swap in a DNN VAD (e.g. Silero) via + * `channelAttribution.createVad`. See the dual-channel sample app's + * README for worked examples. + */ + channels?: Array<{ name: string }>; + /** Tuning for dual-channel attribution. Ignored when `channels` is unset. */ + channelAttribution?: ChannelAttributionParams; }; export type StreamingEvents = @@ -59,6 +147,7 @@ export type StreamingEvents = | "speechStarted" | "llmGatewayResponse" | "warning" + | "vad" | "error"; export type StreamingListeners = { @@ -68,6 +157,7 @@ export type StreamingListeners = { speechStarted?: (event: SpeechStartedEvent) => void; llmGatewayResponse?: (event: LLMGatewayResponseEvent) => void; warning?: (event: WarningEvent) => void; + vad?: (event: VadFrame) => void; error?: (error: Error) => void; }; @@ -186,6 +276,12 @@ export type TurnEvent = { language_code?: string; language_confidence?: number; speaker_label?: string; + /** + * Duration-weighted majority channel across `words[i].channel`. Populated only + * when the transcriber is configured with `channels`. Independent from + * `speaker_label`. + */ + channel?: Channel; }; export type StreamingWord = { @@ -195,6 +291,20 @@ export type StreamingWord = { text: string; word_is_final: boolean; speaker?: string; + /** + * Physical input channel attributed by client-side VAD during this word's + * time window. Populated only when the transcriber is configured with + * `channels`. Independent from `speaker`. + */ + channel?: Channel; + /** + * True if `channel` was filled in by `channelAttribution.resolveUnknownChannelsMethod` + * rather than by the per-word VAD. Only set on words whose per-word VAD + * attribution was `"unknown"` and whose resolution method produced a + * confident channel. Useful for debugging or rendering an indicator that a + * word's channel came from context, not direct VAD evidence. + */ + channelResolved?: boolean; }; export type TerminationEvent = { diff --git a/tests/unit/dual-channel-browser-guard.test.ts b/tests/unit/dual-channel-browser-guard.test.ts new file mode 100644 index 0000000..dafd796 --- /dev/null +++ b/tests/unit/dual-channel-browser-guard.test.ts @@ -0,0 +1,19 @@ +import { + BrowserOnlyError, + DualChannelCapture, +} from "../../src/services/streaming/browser/dual-channel-capture"; +import { StreamingTranscriber } from "../../src/services/streaming/service"; + +describe("DualChannelCapture in non-browser env", () => { + it("throws BrowserOnlyError when AudioContext is missing", () => { + // jest runs with testEnvironment: "node", so globalThis.AudioContext is undefined. + expect( + () => + new DualChannelCapture({ + micStream: undefined as unknown as MediaStream, + systemStream: undefined as unknown as MediaStream, + transcriber: undefined as unknown as StreamingTranscriber, + }), + ).toThrow(BrowserOnlyError); + }); +}); diff --git a/tests/unit/dual-channel-energy-vad.test.ts b/tests/unit/dual-channel-energy-vad.test.ts new file mode 100644 index 0000000..5b38797 --- /dev/null +++ b/tests/unit/dual-channel-energy-vad.test.ts @@ -0,0 +1,53 @@ +import { EnergyVad } from "../../src/services/streaming/energy-vad"; + +const FRAME = 320; + +function silence(): Float32Array { + return new Float32Array(FRAME); +} + +function tone(amp: number): Float32Array { + const f = new Float32Array(FRAME); + for (let i = 0; i < FRAME; i++) f[i] = amp * Math.sin((2 * Math.PI * i) / 16); + return f; +} + +describe("EnergyVad", () => { + it("reports silent frames as inactive after the noise floor adapts", () => { + const vad = new EnergyVad({ initialNoiseFloor: 1e-4, hangoverFrames: 0 }); + // Warm up noise floor with silence. + for (let i = 0; i < 5; i++) vad.process(silence()); + expect(vad.process(silence()).active).toBe(false); + }); + + it("triggers on a tone significantly above the noise floor", () => { + const vad = new EnergyVad({ initialNoiseFloor: 1e-4, hangoverFrames: 0 }); + for (let i = 0; i < 5; i++) vad.process(silence()); + expect(vad.process(tone(0.5)).active).toBe(true); + }); + + it("holds active state through hangover frames", () => { + const vad = new EnergyVad({ initialNoiseFloor: 1e-4, hangoverFrames: 3 }); + for (let i = 0; i < 5; i++) vad.process(silence()); + expect(vad.process(tone(0.5)).active).toBe(true); + expect(vad.process(silence()).active).toBe(true); + expect(vad.process(silence()).active).toBe(true); + expect(vad.process(silence()).active).toBe(true); + expect(vad.process(silence()).active).toBe(false); + }); + + it("reset() restores initial state", () => { + const vad = new EnergyVad({ initialNoiseFloor: 1e-4, hangoverFrames: 5 }); + vad.process(tone(0.9)); + expect(vad.process(silence()).active).toBe(true); // in hangover + vad.reset(); + expect(vad.process(silence()).active).toBe(false); + }); + + it("returns RMS energy alongside the activity decision", () => { + const vad = new EnergyVad(); + const result = vad.process(tone(0.5)); + expect(result.energy).toBeGreaterThan(0); + expect(result.energy).toBeLessThan(1); + }); +}); diff --git a/tests/unit/dual-channel-label-mapper.test.ts b/tests/unit/dual-channel-label-mapper.test.ts new file mode 100644 index 0000000..3f5504d --- /dev/null +++ b/tests/unit/dual-channel-label-mapper.test.ts @@ -0,0 +1,227 @@ +import { StreamingWord, TurnEvent } from "../../src"; +import { + attributeTurn, + attributeWord, + rollUpTurnChannel, + VadTimeline, +} from "../../src/services/streaming/label-mapper"; +import { VadFrame } from "../../src/types/streaming/dual-channel"; + +function frame( + ts: number, + channel: string, + active: boolean, + rms = active ? 0.5 : 0, +): VadFrame { + return { ts, channel, active, rms }; +} + +function buildTimeline(frames: VadFrame[]): VadTimeline { + const t = new VadTimeline(60_000); + for (const f of frames) t.pushFrame(f); + return t; +} + +describe("attributeWord", () => { + const params = { dominanceRatio: 4 }; + + it("returns 'unknown' for a window with no VAD frames", () => { + const t = new VadTimeline(60_000); + const ch = attributeWord( + { start: 0, end: 200, confidence: 1, text: "hi", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("unknown"); + }); + + it("returns 'mic' when only mic frames are active in the window", () => { + const t = buildTimeline([ + frame(10, "mic", true), + frame(30, "mic", true), + frame(50, "mic", true), + ]); + const ch = attributeWord( + { start: 0, end: 100, confidence: 1, text: "hello", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("mic"); + }); + + it("returns 'system' when only system frames are active", () => { + const t = buildTimeline([ + frame(10, "system", true), + frame(30, "system", true), + ]); + const ch = attributeWord( + { start: 0, end: 100, confidence: 1, text: "ok", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("system"); + }); + + it("returns the higher-scoring channel when ratio threshold not met", () => { + const t = buildTimeline([ + frame(10, "mic", true, 0.5), + frame(20, "system", true, 0.4), + ]); + const ch = attributeWord( + { start: 0, end: 100, confidence: 1, text: "x", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("mic"); + }); + + it("returns 'unknown' on an exact tie", () => { + const t = buildTimeline([ + frame(10, "mic", true, 0.5), + frame(20, "system", true, 0.5), + ]); + const ch = attributeWord( + { start: 0, end: 100, confidence: 1, text: "x", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("unknown"); + }); + + it("ignores inactive frames", () => { + const t = buildTimeline([ + frame(10, "mic", false, 0.9), + frame(20, "system", true, 0.1), + ]); + const ch = attributeWord( + { start: 0, end: 100, confidence: 1, text: "x", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("system"); + }); + + it("ignores frames outside the [start, end] window", () => { + const t = buildTimeline([ + frame(10, "system", true), + frame(150, "mic", true), + frame(500, "system", true), + ]); + const ch = attributeWord( + { start: 100, end: 200, confidence: 1, text: "x", word_is_final: true }, + t, + params, + ); + expect(ch).toBe("mic"); + }); +}); + +describe("rollUpTurnChannel", () => { + function w( + channel: string | undefined, + start: number, + end: number, + ): StreamingWord { + return { + start, + end, + confidence: 1, + text: "x", + word_is_final: true, + channel, + }; + } + + it("returns 'unknown' for an empty word list", () => { + expect(rollUpTurnChannel([])).toBe("unknown"); + }); + + it("returns 'unknown' when all words are unknown", () => { + expect(rollUpTurnChannel([w("unknown", 0, 100)])).toBe("unknown"); + }); + + it("returns 'unknown' when all words are missing a channel", () => { + expect(rollUpTurnChannel([w(undefined, 0, 100)])).toBe("unknown"); + }); + + it("returns the duration-dominant channel", () => { + expect( + rollUpTurnChannel([w("mic", 0, 1000), w("system", 1000, 1100)]), + ).toBe("mic"); + }); + + it("returns 'unknown' on equal mic/system durations", () => { + expect(rollUpTurnChannel([w("mic", 0, 500), w("system", 500, 1000)])).toBe( + "unknown", + ); + }); + + it("ignores unknown-channel words in the rollup vote", () => { + expect( + rollUpTurnChannel([w("unknown", 0, 10_000), w("mic", 10_000, 10_200)]), + ).toBe("mic"); + }); +}); + +describe("attributeTurn", () => { + it("mutates the turn in place: sets channel on each word and rolls up", () => { + const t = buildTimeline([ + frame(10, "mic", true), + frame(150, "system", true), + ]); + const turn: TurnEvent = { + type: "Turn", + turn_order: 1, + turn_is_formatted: false, + end_of_turn: true, + transcript: "hi there", + end_of_turn_confidence: 0.9, + speaker_label: "A", + words: [ + { + start: 0, + end: 100, + confidence: 1, + text: "hi", + word_is_final: true, + speaker: "A", + }, + { + start: 100, + end: 200, + confidence: 1, + text: "there", + word_is_final: true, + speaker: "A", + }, + ], + }; + attributeTurn(turn, t, { dominanceRatio: 4 }); + expect(turn.speaker_label).toBe("A"); + expect(turn.words[0].speaker).toBe("A"); + expect(turn.words[0].channel).toBe("mic"); + expect(turn.words[1].channel).toBe("system"); + expect(turn.channel).toBe("unknown"); // equal duration → tie → unknown + }); +}); + +describe("VadTimeline window behavior", () => { + it("drops frames outside the rolling window", () => { + const t = new VadTimeline(100); + t.pushFrame(frame(0, "mic", true)); + t.pushFrame(frame(50, "mic", true)); + t.pushFrame(frame(150, "mic", true)); + t.pushFrame(frame(200, "system", true)); + const inWindow = t.framesInWindow(0, 1000); + expect(inWindow.map((f) => f.ts)).toEqual([150, 200]); + }); + + it("keeps everything inside the rolling window", () => { + const t = new VadTimeline(1000); + t.pushFrame(frame(0, "mic", true)); + t.pushFrame(frame(500, "mic", true)); + t.pushFrame(frame(900, "mic", true)); + const inWindow = t.framesInWindow(0, 1000); + expect(inWindow.length).toBe(3); + }); +}); diff --git a/tests/unit/dual-channel-resampler.test.ts b/tests/unit/dual-channel-resampler.test.ts new file mode 100644 index 0000000..a606d06 --- /dev/null +++ b/tests/unit/dual-channel-resampler.test.ts @@ -0,0 +1,61 @@ +import { + float32ToPcm16, + LinearResampler, +} from "../../src/services/streaming/resampler"; + +describe("LinearResampler", () => { + it("is identity when source == target", () => { + const r = new LinearResampler(16000, 16000); + const input = new Float32Array([0.1, 0.2, 0.3, 0.4]); + const out = r.process(input); + expect(out.length).toBe(4); + for (let i = 0; i < 4; i++) expect(out[i]).toBeCloseTo(input[i], 5); + }); + + it("approximately halves length when down-sampling 2:1", () => { + const r = new LinearResampler(48000, 24000); + const input = new Float32Array(960); + for (let i = 0; i < input.length; i++) input[i] = i / 960; + const out = r.process(input); + expect(out.length).toBeGreaterThan(479); + expect(out.length).toBeLessThan(482); + }); + + it("preserves continuity across chunk boundaries", () => { + const r = new LinearResampler(48000, 16000); + const chunkA = new Float32Array(480); + const chunkB = new Float32Array(480); + for (let i = 0; i < 480; i++) { + chunkA[i] = i; + chunkB[i] = 480 + i; + } + const a = r.process(chunkA); + const b = r.process(chunkB); + expect(b.length).toBeGreaterThan(0); + // The first output of chunkB should be near the last input of chunkA + // (continuity), not jump back to 0. + expect(b[0]).toBeGreaterThan(a[a.length - 1]); + }); + + it("rejects non-positive sample rates", () => { + expect(() => new LinearResampler(0, 16000)).toThrow(); + expect(() => new LinearResampler(48000, -1)).toThrow(); + }); +}); + +describe("float32ToPcm16", () => { + it("clamps and packs to little-endian Int16", () => { + const buf = float32ToPcm16(new Float32Array([0, 1, -1, 2, -2])); + const view = new DataView(buf); + expect(view.getInt16(0, true)).toBe(0); + expect(view.getInt16(2, true)).toBe(0x7fff); + expect(view.getInt16(4, true)).toBe(-0x8000); + expect(view.getInt16(6, true)).toBe(0x7fff); // clamped from 2 + expect(view.getInt16(8, true)).toBe(-0x8000); // clamped from -2 + }); + + it("returns an empty buffer for empty input", () => { + const buf = float32ToPcm16(new Float32Array(0)); + expect(buf.byteLength).toBe(0); + }); +}); diff --git a/tests/unit/streaming-dual-channel.test.ts b/tests/unit/streaming-dual-channel.test.ts new file mode 100644 index 0000000..8293eb1 --- /dev/null +++ b/tests/unit/streaming-dual-channel.test.ts @@ -0,0 +1,864 @@ +jest.mock("ws", () => require("./mocks/ws")); + +import WS from "jest-websocket-mock"; +import fetchMock from "jest-fetch-mock"; +import { StreamingTranscriber, TurnEvent, VadFrame } from "../../src"; + +fetchMock.enableMocks(); + +const websocketBaseUrl = "wss://localhost:4242/v3/ws"; + +const sessionBeginsMessage = { + type: "Begin", + id: "abc", + expires_at: 1, +}; + +const sessionTerminatedMessage = { + type: "Termination", +}; + +async function connect(rt: StreamingTranscriber, server: WS): Promise { + const p = rt.connect(); + await server.connected; + server.send(JSON.stringify(sessionBeginsMessage)); + await p; +} + +async function teardown(rt: StreamingTranscriber, server: WS): Promise { + const p = rt.close(); + server.send(JSON.stringify(sessionTerminatedMessage)); + await p; + await server.closed; + WS.clean(); +} + +/** Build a 20ms-frame-aligned loud Int16 PCM buffer at the given amplitude. */ +function loudPcm(samples: number, amplitude = 20_000): ArrayBuffer { + const buf = new Int16Array(samples); + for (let i = 0; i < samples; i++) { + buf[i] = i % 2 === 0 ? amplitude : -amplitude; + } + return buf.buffer; +} + +/** Build a silent Int16 PCM buffer (all zeros). */ +function silentPcm(samples: number): ArrayBuffer { + return new Int16Array(samples).buffer; +} + +describe("StreamingTranscriber constructor validation (dual-channel)", () => { + it("throws when channels has the wrong arity", () => { + expect( + () => + new StreamingTranscriber({ + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "only" }], + }), + ).toThrow(/exactly 2/); + }); + + it("throws when channel names are not unique", () => { + expect( + () => + new StreamingTranscriber({ + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "dup" }, { name: "dup" }], + }), + ).toThrow(/unique/); + }); +}); + +describe("StreamingTranscriber single-channel backward compat", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + it("sendAudio(buf) forwards directly to ws (no channel needed)", async () => { + const payload = silentPcm(160); + rt.sendAudio(payload); + await expect(server).toReceiveMessage(payload); + }); + + it("sendAudio(buf, { channel }) is accepted and ignored in single-channel mode", async () => { + const payload = silentPcm(160); + rt.sendAudio(payload, { channel: "anything" }); + await expect(server).toReceiveMessage(payload); + }); +}); + +describe("StreamingTranscriber dual-channel sendAudio validation", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + it("throws when channel is missing in dual-channel mode", () => { + expect(() => rt.sendAudio(silentPcm(16))).toThrow( + /dual-channel mode; sendAudio requires/, + ); + }); + + it("throws on unknown channel name", () => { + expect(() => rt.sendAudio(silentPcm(16), { channel: "bogus" })).toThrow( + /Unknown channel "bogus"/, + ); + }); +}); + +describe("StreamingTranscriber dual-channel mixing and VAD", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution: { flushIntervalMs: 50 }, + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + it("flushes mixed mono PCM at flushIntervalMs cadence", async () => { + // 800 samples = 50ms at 16kHz on each channel — exactly the minimum the + // server accepts per audio message. Both channels send the same amount, + // so the next flush emits 800 Int16 samples = 1600 bytes of mono PCM. + rt.sendAudio(loudPcm(800), { channel: "mic" }); + rt.sendAudio(silentPcm(800), { channel: "system" }); + const msg = (await server.nextMessage) as ArrayBuffer; + expect(msg.byteLength).toBe(800 * 2); + }); + + it("emits a 'vad' event per 20ms frame as PCM is ingested", () => { + const vadFrames: VadFrame[] = []; + rt.on("vad", (f) => vadFrames.push(f)); + // 320 samples = exactly one 20ms VAD frame at 16kHz. + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + expect(vadFrames.length).toBe(2); + const channels = vadFrames.map((f) => f.channel).sort(); + expect(channels).toEqual(["mic", "system"]); + const mic = vadFrames.find((f) => f.channel === "mic")!; + expect(mic.rms).toBeGreaterThan(0); + expect(mic.ts).toBeCloseTo(20, 1); // 320 / 16000 * 1000 = 20ms + }); + + it("attributes a Turn message based on per-channel VAD energy", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive ~200ms of loud mic audio + matching silence on system. + // Energy VAD with default thresholdRatio=3 takes a few frames to "lock on"; + // 10 frames (200ms) is enough to push energy well above the adaptive floor. + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + + const turn: TurnEvent = { + type: "Turn", + turn_order: 1, + turn_is_formatted: false, + end_of_turn: true, + transcript: "hello", + end_of_turn_confidence: 0.9, + speaker_label: "A", + words: [ + { + start: 20, + end: 200, + confidence: 1, + text: "hello", + word_is_final: true, + speaker: "A", + }, + ], + }; + server.send(JSON.stringify(turn)); + + expect(received).toHaveLength(1); + const got = received[0]; + expect(got.speaker_label).toBe("A"); + expect(got.words[0].speaker).toBe("A"); + expect(got.words[0].channel).toBe("mic"); + expect(got.channel).toBe("mic"); + }); + + it("mixes per-channel PCM with /channelCount averaging", async () => { + // 800 samples each (50ms floor): mic constant +10000, system constant + // -10000. Average per sample = 0; mixed mono should be all zeros. + const micBuf = new Int16Array(800); + const sysBuf = new Int16Array(800); + for (let i = 0; i < 800; i++) { + micBuf[i] = 10_000; + sysBuf[i] = -10_000; + } + rt.sendAudio(micBuf.buffer, { channel: "mic" }); + rt.sendAudio(sysBuf.buffer, { channel: "system" }); + const msg = (await server.nextMessage) as ArrayBuffer; + const mixed = new Int16Array(msg); + expect(mixed.length).toBe(800); + for (let i = 0; i < mixed.length; i++) { + expect(mixed[i]).toBe(0); + } + + // Now: both channels at +20000 → mix should be +20000 (avg, not sum). + const both = new Int16Array(800); + for (let i = 0; i < 800; i++) both[i] = 20_000; + rt.sendAudio(both.buffer, { channel: "mic" }); + rt.sendAudio(both.buffer, { channel: "system" }); + const msg2 = (await server.nextMessage) as ArrayBuffer; + const mixed2 = new Int16Array(msg2); + for (let i = 0; i < mixed2.length; i++) { + expect(mixed2[i]).toBe(20_000); + } + }); + + it("only flushes min(channel lengths); the longer channel retains its tail", async () => { + // Mic: 1600 samples, system: 800 samples (both ≥ the 50ms floor). First + // flush should emit 800 mixed samples; the remaining 800 mic samples wait + // for system to catch up. + rt.sendAudio(loudPcm(1600), { channel: "mic" }); + rt.sendAudio(silentPcm(800), { channel: "system" }); + const first = (await server.nextMessage) as ArrayBuffer; + expect(first.byteLength).toBe(800 * 2); + + // Now feed 800 more on system → second flush of 800 mixed samples. + rt.sendAudio(silentPcm(800), { channel: "system" }); + const second = (await server.nextMessage) as ArrayBuffer; + expect(second.byteLength).toBe(800 * 2); + }); + + it("aggregates samples across sendAudio calls into a single VAD frame", () => { + const vadFrames: VadFrame[] = []; + rt.on("vad", (f) => vadFrames.push(f)); + // Feed mic in two halves of 160 samples — should still produce exactly one + // 20ms VAD frame once the second half lands. + rt.sendAudio(loudPcm(160), { channel: "mic" }); + expect(vadFrames.filter((f) => f.channel === "mic")).toHaveLength(0); + rt.sendAudio(loudPcm(160), { channel: "mic" }); + const mic = vadFrames.filter((f) => f.channel === "mic"); + expect(mic).toHaveLength(1); + expect(mic[0].ts).toBeCloseTo(20, 1); // 320 / 16000 * 1000 + }); + + it("withholds sub-50ms flushes until enough audio accumulates", async () => { + // 320 samples = 20ms — below the server's 50ms floor. The mixer should + // hold them in the per-channel buffers and NOT emit a message yet. + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + // Top each channel up to exactly 50ms (320 + 480 = 800 samples). Now the + // floor is met and the next flush emits 800 samples — and only those 800. + rt.sendAudio(loudPcm(480), { channel: "mic" }); + rt.sendAudio(silentPcm(480), { channel: "system" }); + + const msg = (await server.nextMessage) as ArrayBuffer; + expect(msg.byteLength).toBe(800 * 2); + }); + + it("produces multiple sequential flushes as new PCM arrives", async () => { + rt.sendAudio(loudPcm(800), { channel: "mic" }); + rt.sendAudio(silentPcm(800), { channel: "system" }); + const a = (await server.nextMessage) as ArrayBuffer; + expect(a.byteLength).toBe(800 * 2); + + rt.sendAudio(loudPcm(800), { channel: "mic" }); + rt.sendAudio(silentPcm(800), { channel: "system" }); + const b = (await server.nextMessage) as ArrayBuffer; + expect(b.byteLength).toBe(800 * 2); + }); + + it("caps each emitted chunk at MAX_CHUNK_MS even with a large backlog", async () => { + // 8000 samples per channel = 500 ms backlog at 16 kHz. Without the cap + // this would emit a single 16,000-byte (>1000 ms equivalent on the wire) + // message; with the cap it drains in multiple ≤200 ms (3200-sample) sends. + rt.sendAudio(loudPcm(8000), { channel: "mic" }); + rt.sendAudio(silentPcm(8000), { channel: "system" }); + + let totalSamples = 0; + const maxSamplesPerSend = Math.round(16_000 * (200 / 1000)); + while (totalSamples < 8000) { + const msg = (await server.nextMessage) as ArrayBuffer; + const samples = msg.byteLength / 2; + expect(samples).toBeLessThanOrEqual(maxSamplesPerSend); + expect(samples).toBeGreaterThan(0); + totalSamples += samples; + } + expect(totalSamples).toBe(8000); + }); + + it("clears the flush timer when the socket is closed externally", async () => { + const spy = jest.spyOn(global, "clearInterval"); + rt.sendAudio(silentPcm(160), { channel: "mic" }); + rt.sendAudio(silentPcm(160), { channel: "system" }); + // Close the client-side WebSocket without going through transcriber.close(). + // The onclose handler should fire and clear the flush timer so subsequent + // ticks don't try to call send() on a dead socket. + type SocketHolder = { socket?: { close: () => void } }; + (rt as unknown as SocketHolder).socket?.close(); + // Yield a tick so the onclose event drains. + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(spy).toHaveBeenCalled(); + spy.mockRestore(); + + // Replace rt/server so afterEach can run the standard teardown. + WS.clean(); + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution: { flushIntervalMs: 50 }, + }); + await connect(rt, server); + }); + + it("close() clears the flush timer", async () => { + const spy = jest.spyOn(global, "clearInterval"); + rt.sendAudio(silentPcm(160), { channel: "mic" }); + rt.sendAudio(silentPcm(160), { channel: "system" }); + + const closePromise = rt.close(); + server.send(JSON.stringify(sessionTerminatedMessage)); + await closePromise; + await server.closed; + WS.clean(); + expect(spy).toHaveBeenCalled(); + spy.mockRestore(); + + // Replace rt/server so afterEach can run the standard teardown without errors. + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + }); + await connect(rt, server); + }); +}); + +describe("StreamingTranscriber resolveUnknownChannelsMethod = 'window'", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution: { + resolveUnknownChannelsMethod: "window", + resolutionWindowWords: 2, + }, + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + /** + * Build a TurnEvent whose words already have pre-set `channel` values, so + * tests can exercise the unknown-channel resolver without driving VAD audio + * through the transcriber. attributeTurn() will overwrite the channels with + * what it sees on the timeline — but for words whose timestamps don't + * overlap any timeline frame, attributeTurn returns "unknown", preserving + * the pre-set values via... actually no, it sets unknown. + * + * To get reliable per-word channel values we drive the timeline first. + */ + function makeTurn( + words: Array<{ + text: string; + start: number; + end: number; + speaker?: string; + }>, + ): TurnEvent { + return { + type: "Turn", + turn_order: 1, + turn_is_formatted: false, + end_of_turn: true, + transcript: words.map((w) => w.text).join(" "), + end_of_turn_confidence: 0.9, + speaker_label: words[0].speaker ?? "A", + words: words.map((w) => ({ + start: w.start, + end: w.end, + confidence: 1, + text: w.text, + word_is_final: true, + speaker: w.speaker ?? "A", + })), + }; + } + + it("fills an unknown word surrounded by mic neighbors and marks it channelResolved", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive ~200ms of mic-loud audio so the first two words get a confident + // "mic" attribution from per-word VAD. + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + + // Word at [10000, 10100] has no VAD frames → unknown. Surrounding mic + // words should let the window resolver fill it as mic. + server.send( + JSON.stringify( + makeTurn([ + { text: "hello", start: 20, end: 80 }, + { text: "there", start: 100, end: 180 }, + { text: "a", start: 10_000, end: 10_100 }, // no VAD evidence + ]), + ), + ); + + const turn = received[0]; + expect(turn.words[0].channel).toBe("mic"); + expect(turn.words[1].channel).toBe("mic"); + expect(turn.words[2].channel).toBe("mic"); + expect(turn.words[2].channelResolved).toBe(true); + expect(turn.words[0].channelResolved).toBeUndefined(); + expect(turn.channel).toBe("mic"); + }); + + it("leaves an unknown word alone if it has no non-unknown neighbors", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // No audio driven → all words attributeWord-resolve to "unknown". + server.send( + JSON.stringify( + makeTurn([ + { text: "x", start: 0, end: 100 }, + { text: "y", start: 100, end: 200 }, + { text: "z", start: 200, end: 300 }, + ]), + ), + ); + + const turn = received[0]; + for (const w of turn.words) { + expect(w.channel).toBe("unknown"); + expect(w.channelResolved).toBeUndefined(); + } + }); + + it("leaves an unknown word alone on a neighbor tie (1 mic + 1 system)", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive both: 200ms loud on mic AND loud on system. First we need mic-only + // for the first word, then system-only for the third word, separated by + // an unknown middle. That requires two distinct time windows. + // Window 1 (0-200ms): mic loud, system silent → first word resolves to mic. + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + // Window 2 (200-400ms): mic silent, system loud → third word resolves to system. + for (let i = 0; i < 10; i++) { + rt.sendAudio(silentPcm(320), { channel: "mic" }); + rt.sendAudio(loudPcm(320), { channel: "system" }); + } + + server.send( + JSON.stringify( + makeTurn([ + { text: "first", start: 20, end: 180 }, + { text: "middle", start: 5_000, end: 5_100 }, // unknown, no VAD + { text: "third", start: 220, end: 380 }, + ]), + ), + ); + + const turn = received[0]; + expect(turn.words[0].channel).toBe("mic"); + expect(turn.words[2].channel).toBe("system"); + // Tie between mic and system neighbors → middle stays unknown. + expect(turn.words[1].channel).toBe("unknown"); + expect(turn.words[1].channelResolved).toBeUndefined(); + }); + + it("never modifies confident per-word VAD decisions", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive mic-loud audio so per-word VAD confidently calls each word "mic". + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + + server.send( + JSON.stringify( + makeTurn([ + { text: "alpha", start: 20, end: 80 }, + { text: "beta", start: 100, end: 180 }, + ]), + ), + ); + + const turn = received[0]; + expect(turn.words[0].channel).toBe("mic"); + expect(turn.words[1].channel).toBe("mic"); + // No words were unknown → no resolution markers should be set. + expect(turn.words[0].channelResolved).toBeUndefined(); + expect(turn.words[1].channelResolved).toBeUndefined(); + }); +}); + +describe("StreamingTranscriber resolveUnknownChannelsMethod = 'none'", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution: { resolveUnknownChannelsMethod: "none" }, + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + it("leaves unknown words untouched", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive 200ms of mic-loud audio so we have a baseline timeline. + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + + // Send a Turn with one word inside the mic-active window (would resolve to mic + // even without resolution since VAD attributes it) plus one outside any VAD + // window (would only be filled by resolution). + const turn: TurnEvent = { + type: "Turn", + turn_order: 1, + turn_is_formatted: false, + end_of_turn: true, + transcript: "hello x", + end_of_turn_confidence: 0.9, + speaker_label: "A", + words: [ + { + start: 20, + end: 180, + confidence: 1, + text: "hello", + word_is_final: true, + speaker: "A", + }, + { + start: 100_000, + end: 100_200, + confidence: 1, + text: "x", + word_is_final: true, + speaker: "A", + }, + ], + }; + server.send(JSON.stringify(turn)); + + const got = received[0]; + // First word: per-word VAD attributed it (mic) — unchanged. + expect(got.words[0].channel).toBe("mic"); + // Second word: no VAD evidence in window → unknown, and "none" leaves it. + expect(got.words[1].channel).toBe("unknown"); + expect(got.words[1].channelResolved).toBeUndefined(); + }); +}); + +describe("StreamingTranscriber resolveUnknownChannelsMethod default", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + // No channelAttribution at all — default should fire: resolveUnknownChannelsMethod: "window". + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + it("resolves unknown words by default (no channelAttribution passed)", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + + server.send( + JSON.stringify({ + type: "Turn", + turn_order: 1, + turn_is_formatted: false, + end_of_turn: true, + transcript: "hello a there", + end_of_turn_confidence: 0.9, + speaker_label: "A", + words: [ + { + start: 20, + end: 80, + confidence: 1, + text: "hello", + word_is_final: true, + speaker: "A", + }, + { + // No timeline coverage → resolves to "unknown" via per-word VAD. + start: 10_000, + end: 10_100, + confidence: 1, + text: "a", + word_is_final: true, + speaker: "A", + }, + { + start: 100, + end: 180, + confidence: 1, + text: "there", + word_is_final: true, + speaker: "A", + }, + ], + }), + ); + + const turn = received[0]; + expect(turn.words[1].channel).toBe("mic"); + expect(turn.words[1].channelResolved).toBe(true); + }); +}); + +describe("StreamingTranscriber resolveUnknownChannelsMethod = 'speaker-history'", () => { + let server: WS; + let rt: StreamingTranscriber; + + beforeEach(async () => { + server = new WS(websocketBaseUrl); + rt = new StreamingTranscriber({ + websocketBaseUrl, + token: "t", + sampleRate: 16_000, + speechModel: "u3-rt-pro", + channels: [{ name: "mic" }, { name: "system" }], + channelAttribution: { + resolveUnknownChannelsMethod: "speaker-history", + // Low threshold so a single turn's worth of audio is enough. + speakerHistoryMinRmsEvidence: 0.01, + speakerHistoryDominanceRatio: 3, + }, + }); + await connect(rt, server); + }); + + afterEach(async () => { + await teardown(rt, server); + }); + + function makeTurn( + words: Array<{ + text: string; + start: number; + end: number; + speaker?: string; + }>, + ): TurnEvent { + return { + type: "Turn", + turn_order: 1, + turn_is_formatted: false, + end_of_turn: true, + transcript: words.map((w) => w.text).join(" "), + end_of_turn_confidence: 0.9, + speaker_label: words[0].speaker ?? "A", + words: words.map((w) => ({ + start: w.start, + end: w.end, + confidence: 1, + text: w.text, + word_is_final: true, + speaker: w.speaker ?? "A", + })), + }; + } + + it("fills an unknown word when the speaker has dominant session evidence", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive ~200ms of mic-loud audio so speaker A's first words accumulate + // strong mic-side evidence. + for (let i = 0; i < 10; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + + // First turn: two confident mic words for speaker A. After this, A has + // strong mic evidence in the speaker-history map. + server.send( + JSON.stringify( + makeTurn([ + { text: "hello", start: 20, end: 80 }, + { text: "there", start: 100, end: 180 }, + ]), + ), + ); + expect(received[0].words[0].channel).toBe("mic"); + + // Second turn: speaker A says one word outside any VAD coverage → unknown + // via per-word VAD, but speaker-history should fill it as mic. + server.send( + JSON.stringify( + makeTurn([{ text: "yeah", start: 100_000, end: 100_500 }]), + ), + ); + const turn = received[1]; + expect(turn.words[0].channel).toBe("mic"); + expect(turn.words[0].channelResolved).toBe(true); + }); + + it("does not fill when the speaker has split evidence across channels", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Drive equal loud audio on both channels. + for (let i = 0; i < 20; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(loudPcm(320), { channel: "system" }); + } + + // First turn covers the dual-loud period; per-word VAD will tie → unknown. + server.send( + JSON.stringify(makeTurn([{ text: "both", start: 20, end: 380 }])), + ); + expect(received[0].words[0].channel).toBe("unknown"); + + // Future word with no VAD evidence — speaker A's history is split, so no fill. + server.send( + JSON.stringify(makeTurn([{ text: "x", start: 100_000, end: 100_400 }])), + ); + expect(received[1].words[0].channel).toBe("unknown"); + expect(received[1].words[0].channelResolved).toBeUndefined(); + }); + + it("does not modify confident per-word VAD decisions", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + // Build heavy SYSTEM evidence first. + for (let i = 0; i < 20; i++) { + rt.sendAudio(silentPcm(320), { channel: "mic" }); + rt.sendAudio(loudPcm(320), { channel: "system" }); + } + server.send( + JSON.stringify(makeTurn([{ text: "hi", start: 20, end: 380 }])), + ); + expect(received[0].words[0].channel).toBe("system"); + + // Now feed mic-loud audio. + for (let i = 0; i < 5; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + // Per-word VAD will confidently say "mic" for the new turn. Speaker + // history might still favor system (5 mic frames << 20 system frames), + // but speaker-history must not override confident VAD decisions. + server.send( + JSON.stringify(makeTurn([{ text: "actually", start: 460, end: 560 }])), + ); + expect(received[1].words[0].channel).toBe("mic"); + expect(received[1].words[0].channelResolved).toBeUndefined(); + }); + + it("never modifies speaker_label", async () => { + const received: TurnEvent[] = []; + rt.on("turn", (t) => received.push(t)); + + for (let i = 0; i < 20; i++) { + rt.sendAudio(loudPcm(320), { channel: "mic" }); + rt.sendAudio(silentPcm(320), { channel: "system" }); + } + server.send( + JSON.stringify(makeTurn([{ text: "hi", start: 20, end: 380 }])), + ); + expect(received[0].speaker_label).toBe("A"); + expect(received[0].words[0].speaker).toBe("A"); + }); +});