Skip to content

perf(metrics): batch tokenization with defer-to-flush drain#350

Open
viraatc wants to merge 20 commits into
mainfrom
perf/tok-batch-clean
Open

perf(metrics): batch tokenization with defer-to-flush drain#350
viraatc wants to merge 20 commits into
mainfrom
perf/tok-batch-clean

Conversation

@viraatc

@viraatc viraatc commented Jun 9, 2026

Copy link
Copy Markdown
Collaborator

What

ISL/OSL/TPOT need a tokenizer pass per completion. main dispatches one
asyncio task per event into a 2-thread pool — at high completion rates the
backlog grows unboundedly and the end-of-run drain takes ~an hour per million
samples. This PR batches: triggers enqueue O(1); a small live lane keeps live
metrics current; the end-of-run drain tokenizes everything left through a
process-sharded pool that uses the whole machine.

How

  • BatchTokenizer — the drain runs encode_batch_fast (Rust, rayon)
    across auto-sized worker processes, one pinned per 8-core block of the
    allowed CPU universe (probed via expand_to_all_online_cpus(), then the
    aggregator's inherited mask is restored — the service stays wherever
    the parent placed it). No silent fallbacks: a tokenizer without a fast
    backend, or a failed/over-budget warmup, is a clean startup error. macOS
    shards unpinned (rayon capped per worker) at full speed.
  • Live lane — in-process threads (--metrics-tokenizer-workers, schema
    default 2, the pre-existing knob and footprint; 0 = defer everything to
    the drain), rayon-capped, slice-capped per flush. Owned by the queue
    (start_live); the publisher knows nothing about tokenization.
  • TokenBatchQueue — buffers (text, on_count) per event; live
    failures/cancellations re-queue items (no sample loss), drain failures are
    terminal and stay counted in n_pending_tasks (incomplete-drain contract:
    state == complete && n_pending_tasks > 0). Drain budget --drain-timeout
    (default 60 s, 0 = unlimited); finalize always runs.
  • MetricsTable is fully synchronous; CORES_PER_WORKER is a module
    constant. Defaults are single-sourced in config/schema.py
    (metrics_drain_timeout_s 60 s, metrics_tokenizer_workers 2); the
    service args are required and always forwarded by the benchmark.

Validation

  • Unit suite green (176 metrics-aggregator: queue contract, shard sizing,
    drain timeout/failure, live requeue, RAYON caps, wiring seams);
    pre-commit clean. Offline-burst e2e: state=complete, all series
    populated, drain to n_pending_tasks=0.
  • Sharding is default-on through the real launch path (verified on a
    48-core x86 host and a 144-core GB200): the drain shards span the machine
    while the aggregator keeps its inherited mask.

Tokenizer micro-benchmark (GB200, real DeepSeek-R1 tokenizer)

144-core Grace, corpus = MLPerf DS-R1 prompts tiled to the dataset-mean OSL
of 3877 tokens; identical token counts both sides.

impl parallelism texts/s tokens/s speedup
main 2 threads, per-text encode 313 1.21 M
this PR 18 shards, batched encode 11,951 46.3 M 38×

1M-sample end-to-end A/B vs main

Offline 1M samples, streaming, DS-R1 tokenizer, server-paced at 8k QPS with
~1k-token outputs. Both sides: 1,000,000/1,000,000, state=complete,
n_pending_tasks=0, identical token series.

host impl backlog at ENDED drain total speedup
GB200 144c main 2,970,972 3,362 s 58.1 min
GB200 144c this PR 2,782,912 42.9 s 3.2 min 18.1×
B200 192c main 2,994,925 3,286 s 56.9 min
B200 192c this PR 2,788,032 61 s 3.4 min 16.5×

Measured on the final design (in-process live lane, --tokenizer-workers 2,
300 s drain default). The live lane keeps ~7% of tokenizations current; the
rest (~2.78M) defer to the end-of-run drain, which the sharded pool clears in
43-61 s. A 1M-sample run needs the 300 s budget — 60 s drops the backlog.
main rows (unlimited drain budget, or they never finish) and the
micro-benchmark are unaffected.

🤖 Generated with Claude Code

@viraatc viraatc requested a review from a team June 9, 2026 20:33
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@github-actions github-actions Bot requested review from arekay-nv and nvzhihanj June 9, 2026 20:33

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request replaces the thread-based TokenizePool with a process-sharded BatchTokenizer and a TokenBatchQueue to buffer and batch tokenization work (ISL/OSL/TPOT) during metrics aggregation, preventing the system from falling behind on high-throughput runs. The review feedback highlights critical reliability improvements in token_metrics.py. Specifically, it is recommended to wrap the queue's flush logic in a try...finally block to prevent self._inflight from leaking on exceptions or cancellations. Additionally, count_texts and count_texts_async should explicitly check if the tokenizer is closed, and close() should wait for process pools to shut down to avoid resource leaks.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread src/inference_endpoint/async_utils/services/metrics_aggregator/token_metrics.py Outdated
Comment thread src/inference_endpoint/async_utils/services/metrics_aggregator/token_metrics.py Outdated
Comment thread src/inference_endpoint/async_utils/services/metrics_aggregator/token_metrics.py Outdated
@viraatc viraatc force-pushed the perf/tok-batch-clean branch 2 times, most recently from 39b4a9b to c1d2cb7 Compare June 10, 2026 01:12
@viraatc viraatc force-pushed the perf/tok-batch-clean branch from b1395ab to 1e502c5 Compare June 10, 2026 21:57

@nvzhihanj nvzhihanj left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — re-audit after rebase (HEAD 4633699d)

Reviewed by: Claude + Codex (low reasoning, correctness pass) · Depth: thorough. Focus (as requested): is the metrics-tokenization change modular / clean / non-intrusive (existing benchmark behavior preserved), and are redundant/meaningless tests added.

Verdict: the rebase reduced the intrusiveness but did not resolve it. Replacing the pre_publish hook (full tokenizer pool fired from the publish tick) with a bounded single-shard live lane is a real improvement. But mid-run tokenization is still on by default (--live-tokenizers defaults to 1 → 0.25s live flush), the live shard runs on the highest core block which overlaps the HTTP workers (compute_affinity_plan Phase-3 spillover), and the PR removed the only opt-out (metrics_tokenizer_workers) without replacement — so the observability component can perturb the SUT during measurement and the operator can't turn it off via config. Headline recommendation: default --live-tokenizers 0 for measurement-grade runs (defer all tokenization to the post-run drain), or confine the live shard to cores disjoint from worker_cpu_sets; restore a benchmark-reachable knob. (A1, A2, A3.)

Otherwise clean / non-intrusive. The change stays in the aggregator subprocess (only cross-module touch: importing endpoint_client.cpu_affinity). The consumer contract is verified intactSessionState, the MetricsSnapshot schema, publisher cadence, and the state==COMPLETE && n_pending_tasks>0 incomplete-drain signal are unchanged; flush_remaining is bounded by the drain budget and never raises; the live-loop's failure cannot skip publish_final. The "shard or exit cleanly" fallback and the unpinned-without-affinity (macOS) path are correct and tested.

Tests: no redundant or meaningless tests. The new branches are mostly well covered with behavior-grounded assertions (the _setup_shards decision matrix, no-fast-backend-exit, unpinned-without-affinity, warmup-failure-exit, flush_remaining timeout/failure, live-loop start/stop/survives-failure, expand_to_all_online_cpus). Removing the old metrics_tokenizer_workers tests was correct (dead). The problems are coverage gaps, not redundancy: the aggregator-side start_live wiring is untested (A5) and TestAggregatorArgs no longer pins the forwarded-args contract (A6). Two _FakeProc-injection tests are borderline-coupled to internals but still verify fan-out/reassembly; TestEvenChunks is trivial-but-cheap. No mock-only or duplicate tests found.

Codex findings — not posted: (1) a multi-turn-ISL precompute regression at execute.py:351 — that's PR #349's change, out of scope here; (2) a shutdown(wait=False) worker-terminate race — _terminate_procs already defensively handles _processes is None and CPython doesn't synchronously null it, so the specific mechanism couldn't be verified → dropped. Existing gemini/github-code-quality token_metrics.py comments (flush-exception inflight; closed-tokenizer guards; close() shutdown leak; Protocol ...pass) are unaddressed but deduped here, not re-posted.

Comment thread src/inference_endpoint/commands/benchmark/execute.py
Comment thread src/inference_endpoint/async_utils/services/metrics_aggregator/token_metrics.py Outdated
Comment thread docs/async_utils/services/metrics_aggregator/DESIGN.md Outdated
Comment thread tests/unit/commands/test_benchmark.py
Comment thread src/inference_endpoint/async_utils/services/metrics_aggregator/__main__.py Outdated
Comment thread src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py Outdated
viraatc and others added 13 commits June 15, 2026 21:38
Replace the per-event async tokenize model (one asyncio task per sample's
ISL/OSL/TPOT) with a deferred batch design that keeps tokenization ahead of
completions on high-completion-rate runs, where the per-event tasks otherwise
piled up faster than a single tokenizer thread could clear them and stretched
the end-of-run drain.

- BatchTokenizer: counts whole batches via the raw tokenizers backend
  (encode_batch_fast), sharded across worker processes each pinned to a
  disjoint CORES_PER_WORKER-core block so their rayon pools stay NUMA-local.
  Falls back to a single in-process thread when there is no fast backend or
  fewer than two core blocks fit.
- TokenBatchQueue: triggers enqueue (text/message + a recorder callback)
  instead of spawning tasks; the buffer is tokenized in one sharded call at
  each publish tick (live ISL/OSL/TPOT) and once at end-of-run
  (flush_remaining, bounded by the drain budget). n_pending_tasks now counts
  un-tokenized items, preserving the Report "incomplete drain" contract.
- MetricsTable is now fully synchronous (drops the in-flight task set,
  drain_tasks, and in_flight_tasks_count).
- CORES_PER_WORKER is a module constant; removes the metrics_tokenizer_workers
  config knob (schema/execute/CLI) and regenerates the YAML templates.

Validated: 234 unit + 3 integration tests pass. Offline-burst e2e (echo
server, streaming, real tokenizer) shows a 3000-tokenization backlog at ENDED
drained to n_pending_tasks=0 with the final report state=complete.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The ENDED drain sat outside the finalization try/finally and
flush_remaining caught only TimeoutError: any other tokenizer failure
(e.g. BrokenProcessPool from a dead shard) escaped the fire-and-forget
process() task, skipped publish_final, and hung the subprocess with no
final_snapshot.json. The drain now runs inside the finalization
boundary and flush_remaining swallows non-timeout failures, logs them,
and returns the un-tokenized count — surfacing as an incomplete drain
(n_pending_tasks > 0) instead of a hang.

Cleanup (review feedback):
- delete the test-only sync API (count_texts / token_count /
  token_count_message); production uses only the async paths, and
  count_texts_async now raises RuntimeError after close()
- rename AsyncTokenTrigger -> TokenTrigger (fire() is sync; it enqueues)
- extract _encode_batch_lengths shared by the worker and in-process paths
- pending_tokens property collapses the triple None-guard; the SIGTERM
  handler takes a pending_tokens callback instead of reaching into
  aggregator._token_queue
- drop vestigial return None and quoted forward-ref; trim stale
  "async tasks" wording in docs and the drain-timeout help text
  (templates regenerated); document the wait=False shard shutdown

Tests: sharded-path reassembly + BrokenProcessPool propagation,
_even_chunks, and queue/aggregator drain-failure regression tests.
145 aggregator unit + 160 config/commands/integration tests pass;
pre-commit clean.

Validated on GB200 (ptyche, 144-core Grace, 18 shards, real DeepSeek-R1
tokenizer at mean OSL 3877): 38x vs the per-event pool; 1M-output drain
84s vs ~53min.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…izer-workers

Review-council + e2e findings on the batch-tokenization branch. The
tokenizer drain runs after the benchmark, so the loadgen/worker affinity
partition does not apply to it — but the aggregator subprocess inherited
the loadgen's narrow pin (subprocess.Popen propagates the parent mask)
and sharding silently never engaged under the default
enable_cpu_affinity=true.

- cpu_affinity: add expand_to_all_online_cpus() — reset the current
  process to every online CPU (kernel still clamps to the cgroup/Slurm
  cpuset). The aggregator calls it before constructing the tokenizer, so
  shards size to the full machine by default.
- Restore the --tokenizer-workers service flag with shard semantics:
  -1 auto (one process per 8-core block), explicit count clamped to
  capacity, 0 disables sharding. Every fallback path logs its reason and
  the success log includes setup time.
- flush() phase isolation: a text-batch failure no longer drops the
  message items (separate failure scopes per executor; first error
  re-raised after both phases), and a raising recorder callback is
  logged instead of poisoning the rest of the batch.
- Shard workers ignore SIGINT: Ctrl-C goes to the whole process group;
  the parent drain must control worker lifetime.
- Stale "in-flight async tokenize tasks" wording updated in
  snapshot.py, publisher.py, and AGENTS.md (TokenizePool reference);
  documented the wait=False shard shutdown.

Validated e2e through the real launch path (echo server, default flags,
48-CPU host): aggregator expands 10 -> 48 CPUs, "BatchTokenizer: 6
shards x 8 cores", drain to n_pending_tasks=0, state=complete. 166 unit
tests pass; pre-commit clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The ellipsis bodies trip the code-quality bot's "statement has no
effect" check on every push; pass is semantically identical for
Protocol method declarations and keeps the report clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…erview

New docs/async_utils/services/metrics_aggregator/DESIGN.md (mirroring the
event_logger convention) covering the service lifecycle and the token
metrics pipeline: defer-to-flush batching, process-sharded batch encoding,
the post-run affinity expansion, failure isolation, and the
n_pending_tasks contract. The services overview 6.2 entry now reflects the
batched tokenizer, the snapshot outputs, and the current CLI flags, and
links the new doc.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…d shard warmup

Review-council findings (handled locally):

- A persistently failing pre_publish flush aborted every tick before the
  snapshot was built, silently stopping ALL live metrics publishing — not
  just token series. The flush now fails in its own handler (logged once)
  and the tick always proceeds to build and publish; unflushed items stay
  visible as n_pending_tasks. Regression-tested: a failing flush must not
  suppress state capture/publish.
- Shard warmup waits are bounded (_SHARD_WARMUP_TIMEOUT_S): a hung
  tokenizer load (e.g. stuck network filesystem) now degrades to the
  in-process path instead of wedging service startup forever.
- close() and warmup cleanup terminate shard workers (cancel_futures +
  SIGTERM) so an in-flight encode cannot stall interpreter exit after a
  drain timeout.
- TokenCounter protocol stubs use docstring + raise NotImplementedError
  (the one body shape CodeQL, mypy, and Pyright all accept).
- New TestSetupShardsDecisions pins the --tokenizer-workers contract
  (auto/clamp/disable thresholds, block pinning, affinity and warmup
  failure fallbacks) — previously zero coverage of the decision logic.

162 aggregator unit tests pass; pre-commit clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A fully set-up environment (fast Rust tokenizer backend + Linux affinity)
always shards; anything else was previously a silent in-process fallback
that cannot keep up with completions and only surfaces much later as an
incomplete drain. Setup is now strict:

- no fast backend / no CPU affinity / failed or over-budget warmup ->
  RuntimeError, surfaced by the service entry as a FATAL launch failure
- --tokenizer-workers 0 is the only (explicit) in-process mode
- auto mode always shards: max(1, cpus // 8) — the "fewer than two
  blocks" in-process heuristic is gone; one pinned shard below a full
  block

Also converts the new shard-decision tests to context-managed
BatchTokenizer construction (CodeQL: use-with-statement).

164 aggregator unit tests pass; pre-commit clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The affinity API's absence is a platform property, not a broken
environment: sharding works identically without pinning — the OS
scheduler spreads the workers and only cache/NUMA locality is lost.
_setup_shards now sizes blocks from the online CPU count when
sched_getaffinity is unavailable, and each worker that cannot pin caps
its rayon pool to its block size via RAYON_NUM_THREADS so unpinned
shards do not oversubscribe each other.

The strict startup errors remain for genuine environment problems: a
tokenizer without a fast (Rust) backend, and a failed or over-budget
shard warmup.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…hook

The publisher no longer knows about tokenization: TokenBatchQueue owns
its flush cadence via start_live(interval), removing the pre_publish
callback (and its failure-isolation machinery) added earlier in this
branch. Mid-run flushes go through a bounded live lane —
--live-tokenizers shards (default 1), taken from the highest core
blocks, farthest from the loadgen's low cores — so live ISL/OSL/TPOT
stay current without contending with the benchmark hot path;
--live-tokenizers 0 defers all tokenization to the end-of-run drain,
which always uses every shard.

Live-flush failures and cancellations re-queue the detached items so a
mid-run hiccup never loses samples (the drain retries them); drain
failures remain terminal and pending-counted. Default
metrics-drain-timeout rises 60s -> 300s since the live lane is sized
for currency, not for keeping up with peak completion rates.

For comparison, main tokenizes continuously during the run on 2 threads
inside the aggregator process — which inherits the loadgen's pinned
mask, i.e. directly on the loadgen's cores.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Workstreams from the full design audit:

- Live flushes take at most _LIVE_FLUSH_MAX_ITEMS per kind: bounds the
  queue-lock hold time, the unstoppable in-flight thread encode left
  behind by a drain-start cancellation (close(wait=True) is now bounded
  by ~one slice), and the drain's re-encode of requeued items.
- BatchTokenizer live_workers ctor default aligned to 2 (the CLI
  default); the aggregator class drain-timeout default aligned to 300s
  (the CLI default); --tokenizer-workers < 0 rejected at startup.
- A failed restore of the inherited CPU mask is logged instead of
  silently leaving the aggregator expanded.
- Comment/docstring hygiene: removed prior-implementation narration and
  stale shard-lane/warmup-degrade/publish-tick wording; SIGTERM-only
  phrasing in publisher docs.
- Tests: shard-decision suite no longer issues real sched_setaffinity
  syscalls (probes and restore are patched and asserted); live lane
  pinned as in-process-only; new coverage for RAYON caps (ctor,
  operator override, per-shard block override), live flush slice cap,
  live cancellation/message-failure requeue, and STARTED arming the
  live loop with ENDED stopping it; live-method aliases on all stubs.
- DESIGN.md rewritten for the final shape (in-process live lane,
  drain-only auto-sized shards, probe-and-restore affinity, requeue
  semantics, diagram + CLI table); services overview and AGENTS.md row
  aligned.

345 unit tests pass; pre-commit clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…args seam

- flush_remaining gathers the cancelled live task (return_exceptions)
  instead of a bare suppressed await; the cancellation test awaits via
  wait_for. Both silence the code-quality ineffectual-statement check
  without changing semantics.
- New TestAggregatorArgs case pins the SUT-intrusion seam: --tokenizer
  is forwarded, and no live/worker knobs are — the service defaults
  deliberately govern mid-run tokenization (review feedback).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback (human + council), with the API surface pulled back
toward main:

- A live-flush cancellation landing in the text encode dropped the
  already-detached message items — lost tool-call samples and a final
  snapshot stuck at n_pending_tasks > 0 for work the drain could never
  reach. The text-phase CancelledError handler now re-queues both
  kinds; regression test covers text+message together.
- count_texts_live_async is gone: the live lane is a live= keyword on
  count_texts_async, so the TokenCounter protocol is back to two
  methods and every test stub lost its alias.
- The SIGTERM handler takes the token queue object again (reads
  .pending), not a callable.
- Live flushes take their slice in place (del list[:cap]) instead of
  copying the whole backlog tail under the queue lock each tick.
- Shard warmup budget reduced to 25s so its diagnostic FATAL fires
  before the parent's 30s service-launch kill.
- TestAggregatorArgs pins the SUT-intrusion seam: --tokenizer is
  forwarded; live/worker knobs deliberately are not.

276 unit tests pass; pre-commit clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The service entry wires the SIGTERM handler from the aggregator's table
and token queue; expose them as read-only properties instead of
reaching into private attributes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
viraatc and others added 6 commits June 15, 2026 21:38
The end-of-run drain runs on the full shard pool, so 60s covers
roughly a million buffered tokenizations on a large node; bigger runs
set --metrics-drain-timeout explicitly (0 = unlimited).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…docs

metrics_tokenizer_workers returns to DrainConfig (default 2, ge=0; 0 =
defer all to drain) and execute.py forwards it again. --drain-timeout and
--tokenizer-workers become required service args; the aggregator ctor and
BatchTokenizer lose their duplicated defaults. Docs and comments trimmed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…edits

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A 1M-sample run holds ~2M deferred tokenizations at ENDED; the drain
fans the whole buffer into one encode_batch per shard, so a 60s budget
expires before any chunk returns and the entire backlog is dropped.
300s covers 1M-sample runs with headroom.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The aggregator entrypoint now requires --drain-timeout and
--tokenizer-workers (single-sourced from the schema). The signal-handling
integration test spawns the subprocess directly and still omitted them, so
argparse exited the process (code 2) before any signal handler was
installed. Pass both: --tokenizer-workers 0 (no tokenizer configured, so no
live tokenization) and a small --drain-timeout (never reached — the run is
signalled, not ENDED).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@viraatc viraatc force-pushed the perf/tok-batch-clean branch from 8f547af to f1ac948 Compare June 16, 2026 04:42
pip-audit flagged aiohttp 3.14.0 for CVE-2026-54273..54280 (8 advisories),
all fixed in 3.14.1. aiohttp is a test-only dependency (mock-server
fixture); production uses the custom httptools client. uv.lock regenerated
to match; uv run pip-audit now reports no known vulnerabilities.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@arekay-nv arekay-nv left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

backend's `encode_batch_fast` (Rust, rayon); a single BPE rayon pool
saturates ~8 cores, so disjoint pinned blocks are how the whole machine is
used. Workers are spawn-context, warmed in parallel at construction (bounded
— a hung load is a startup error), and ignore SIGINT.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ignore SIGINT, there is no way to terminate/interrupt?

"--drain-timeout",
type=float,
default=60.0,
required=True,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be required? What about a default of 0 which is what most use-cases would expect. Enforcing a limit can unnecessarily cause runs to fail for hitting the timeout?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if it didn't finish tokenizing, it's invalid anyway - the token count (used in TPS / TPOT calcs) will be lower than the true value.

A more robust solution would be that if this times out, we provide a subcommand to re-calculate these offline given the events.jsonl to recompute the result_summary.json - keeps the run valid and doesn't waste the allocation.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this flag is never exposed to the user as a non-developer user would never launch this by hand. The default I assume is maintained by the defaults in the schema.

Comment on lines +166 to +167
required=True,
help=(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a good default would be nice here (0?) - we might be overloading the user with too many required flags/options which are really tuning knobs.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this flag is never exposed to the user as a non-developer user would never launch this by hand. The default I assume is maintained by the defaults in the schema.

Comment on lines +418 to +422
logger.warning(
"tokenizer drain incomplete (budget %s); %d tokenizations "
"did not complete",
budget,
n_pending,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if drain_timeout_s is 0, hence unlimited, this statement would be executed if we had errors in the tokenization? Do we need to make that differentiation to clarify.

budget,
n_pending,
)
logger.info(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be else?

)
return
text = self._extract_text(ev_rec, row, pre_change)
if not text:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be reached if text is none and message_parts is none, should we count this as an anomaly?

class TpotTrigger(AsyncTokenTrigger):
"""TPOT = (complete_ns - recv_first_ns) / token_count(text_after_first_chunk).
class TpotTrigger(TokenTrigger):
"""TPOT = (complete_ns - recv_first_ns) / output token count.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we emphasize that this excludes TTFT. A minor but subtle difference in how the TPOT can be counted - as one can count it on the client side (compelted_response-request_sent)/num_tokensas well.

Comment on lines -588 to +591
60.0,
300.0,
ge=0,
description=(
"Wall-clock budget (seconds) for the metrics aggregator to drain "
"in-flight tokenize tasks after ENDED (default: 60.0; 0 = unlimited)."
"Wall-clock budget (seconds) to finish tokenizing buffered samples "
"after ENDED (default: 300.0; 0 = unlimited)."

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion to change to 0 as default.
Plus ignore above comments on the defaults there - this unifies the defaults to one place.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm I now see you found out 😆

IMO timeout should be on the order of something like 10 min - we have seen veryyy large backlogs in the tokenizer queue before.

@viraatc viraatc requested a review from nv-alicheng June 26, 2026 22:30

@nv-alicheng nv-alicheng left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by: Codex + Claude | Depth: thorough

Posted 9 inline findings. Well-engineered PR — no critical/high defects; the drain/flush hot path is largely correct. Findings are condition-gated edge cases + polish. See summary comment for the tiered breakdown + one untested-path note that couldn't be posted inline (handler code unchanged in the diff).

_terminate_procs(self._procs)
self._procs = []
if self._thread is not None:
self._thread.shutdown(wait=True)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Codex] medium · concurrencyclose() does self._thread.shutdown(wait=True) on the live tokenizer thread pool. If the run ends while a live flush is in run_in_executor(...), cancelling the queue's live task only cancels the coroutine — the executor thread keeps tokenizing, and this wait=True blocks process teardown until that encode finishes. For large batches with metrics_tokenizer_workers > 0 this can exceed the --drain-timeout budget the rest of the PR enforces. (Distinct from the adjacent note on the process pools, which correctly use wait=False + SIGTERM via _terminate_procs — the gap is the live thread pool.) Relatedly, the SIGTERM path (__main__.py _signal_finalize) never stops the live flush loop the way the ENDED path does via flush_remaining. Stop/cancel the live lane (and bound the in-flight encode) before the blocking shutdown(wait=True).

Raises:
UnsupportedPlatformError: If not running on Linux.
"""
online = _read_sysfs_cpulist(_SYSFS_CPU / "online") or set()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Codex] medium · bug — When _read_sysfs_cpulist(_SYSFS_CPU / 'online') returns empty (sysfs unavailable/filtered — common in some containers), this skips the sched_setaffinity widening and returns os.sched_getaffinity(0), i.e. the narrow inherited loadgen mask. The metrics-aggregator sharding sizes its drain pool from this, so in those environments the shard pool stays pinned to the loadgen cores and can hit the drain timeout on large runs instead of expanding to the full allowed cpuset. Fall back to widening against the cgroup cpuset (or at least log) when online is unreadable, rather than silently returning the narrow mask.

# Drain failures are terminal and stay pending-only.
self._text[:0] = text_items
else:
for (_, on_count), count in zip(text_items, counts, strict=True):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] medium · data-integrity — In the drain (live=False) path both phases are detached up front. The text-phase zip(text_items, counts, strict=True) runs in the else: of the count_texts_async try, so a wrong-length tokenizer result raises ValueError after recording the matched prefix and outside the independent-phase failure handling — it propagates straight out of flush(), skipping the message phase. The detached msg_items are then lost (never recorded, never re-queued; _inflight stays elevated). A wrong-length result is a tokenizer contract violation, but it's the one spot trusting output length without isolation — catch the ValueError inside the phase and route it through failure so the message phase still runs.

# unpinned shards from oversubscribing each other.
logger.debug("could not pin tokenizer worker to %s", core_set)
transformers_logging.set_verbosity_error()
tok = AutoTokenizer.from_pretrained(tokenizer_name, trust_remote_code=True)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] low · security — Each spawned shard worker loads the tokenizer with AutoTokenizer.from_pretrained(tokenizer_name, trust_remote_code=True), executing arbitrary repo-hosted Python at import time. The main process already did this pre-PR, but sharding now replicates remote-code execution across N worker processes. The tokenizer name is operator-supplied (not a new trust boundary), but it broadens the blast radius. If trust_remote_code isn't required for the supported tokenizers, consider defaulting it off or gating behind config.

aggregator from publishing the (incomplete) final snapshot.
"""
if self._live_task is not None:
self._live_task.cancel()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] low · error-handlingflush_remaining documents 'Never raises' (the aggregator depends on this to always publish a final snapshot) and guards flush() in except Exception, but the live-task teardown above it is unguarded: self._live_task.cancel(); await asyncio.gather(self._live_task, return_exceptions=True). If flush_remaining is itself cancelled (outer timeout / shutdown cancelling process()), the await raises CancelledError before reaching the guarded flush(), breaking the absolute 'never raises' contract. Document CancelledError as the one exception, or shield the teardown.

"""
for ex in procs:
ex.shutdown(wait=False, cancel_futures=True)
workers = getattr(ex, "_processes", None) or {} # CPython impl detail.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] low · concurrency_terminate_procs reaches into CPython-private _processes: workers = getattr(ex, '_processes', None) or {}. The guard degrades to a silent no-op if a future CPython changes the attribute name/shape — reintroducing exactly the interpreter-exit stall this function exists to prevent, with no signal. Since it's the only thing keeping a drain timeout from stalling process exit, log once if the private attribute is missing so the regression is observable.

streaming: bool = False,
shutdown_event: asyncio.Event | None = None,
drain_timeout_s: float | None = _DEFAULT_DRAIN_TIMEOUT_S,
drain_timeout_s: float | None,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] low · api-contractdrain_timeout_s: float | None is a required keyword arg declared after defaulted keyword-only params (tokenizer=None, live_flush_interval_s=None, streaming=False, shutdown_event=None). Legal because keyword-only, but easy to misread as optional, and every test constructing the aggregator must remember to pass it. Group required params before defaulted ones, or give it an explicit None default (already means 'unlimited' downstream).

return _encode_batch_lengths(backend, texts)
return [len(tok.tokenize(t)) for t in texts] # type: ignore[union-attr]

async def count_texts_async(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] low · design — The TokenCounter Protocol declares count_texts_async(self, texts, loop, /, *, live=False) and token_count_message_async(..., loop, /) with positional-only markers, but the concrete BatchTokenizer methods (here and token_count_message_async) omit the /. It type-checks (concrete is wider) and all call sites pass positionally, so no runtime bug — but the declared contract and implementation disagree. Align the concrete signatures with the Protocol's /.

partial snapshot tagged `INTERRUPTED`.

## Token metrics pipeline

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council · Claude] low · documentation — The lifecycle diagram draws INTERRUPTED as a branch off DRAINING, but the SIGTERM handler is installed before STARTED and _signal_finalize publishes interrupted=True regardless of state — so INTERRUPTED is reachable from any state (INITIALIZE/LIVE included), which is exactly the case the handler exists for. The adjacent prose even says 'SIGTERM writes a best-effort partial snapshot' without requiring DRAINING. Redraw INTERRUPTED as reachable from any state.

@nv-alicheng

Copy link
Copy Markdown
Collaborator

Review Council — Multi-AI Code Review

Reviewed by: Codex + Claude | Depth: thorough

9 inline findings across 4 files. No critical/high defects — the defer-to-flush hot path is largely correct: the batch is detached before tokenization, drain failures are terminal-and-pending-only (no double-count, no silent loss on the normal paths), publish_final/_finalize run on every terminal path, idempotency is guarded, and the schema is the single source of truth for defaults (drain-timeout 300s, tokenizer-workers 2 consistent across schema, all 3 templates, AGENTS.md, DESIGN.md). aiohttp 3.14.1 bump consistent in pyproject + uv.lock. Findings below are condition-gated edge cases and polish.

🟡 Should Fix (medium)

# File Line Category Reviewer(s) Summary
1 metrics_aggregator/token_metrics.py 461 concurrency Codex close() self._thread.shutdown(wait=True) on the live thread pool can block teardown past --drain-timeout while a live encode is in-flight; SIGTERM path also doesn't stop the live loop
2 endpoint_client/cpu_affinity.py 334 bug Codex sysfs online unreadable → no widening → shard pool pinned to narrow loadgen mask → drain timeout on large runs in sysfs-filtered containers
3 metrics_aggregator/token_metrics.py 626 data-integrity Claude drain-phase zip(..., strict=True) in the else bypasses the independent-phase failure handling: a wrong-length tokenizer result records a partial prefix then propagates, dropping the detached message-phase items

🔵 Consider (low)

# File Line Category Reviewer(s) Summary
4 metrics_aggregator/token_metrics.py 135 security Claude trust_remote_code=True now replicated across N shard worker processes; broadens blast radius
5 metrics_aggregator/token_metrics.py 666 error-handling Claude flush_remaining 'never raises' contract broken by unguarded live-task teardown vs CancelledError
6 metrics_aggregator/token_metrics.py 170 concurrency Claude _terminate_procs silently no-ops if CPython-private _processes shape changes, reintroducing the exit-stall it prevents
7 metrics_aggregator/aggregator.py 122 api-contract Claude required kw drain_timeout_s declared after defaulted kw-only params; reads as optional
8 metrics_aggregator/token_metrics.py 361 design Claude concrete count_texts_async/token_count_message_async omit the Protocol's positional-only /
9 metrics_aggregator/DESIGN.md 24 documentation Claude lifecycle diagram draws INTERRUPTED only off DRAINING; SIGTERM can interrupt from any state

Not posted inline (handler code unchanged in the diff, so no valid inline anchor):

  • __main__.py _on_sigterm/_signal_finalizetesting (Claude): the SIGTERM-during-active-drain race (handler fires while flush_remaining is mid-drain) is untested. Single-INTERRUPTED-snapshot / shutdown_event-set-once / n_pending_tasks correctness rests on reasoning, not a test. Recommend a test firing the handler against a slow in-flight flush_remaining.

⚠️ Commit hygiene: 20 commits including ~8 apparent fixups (fix(metrics): …, chore(metrics): drain-timeout default back to 60s, etc.). Consider squashing the iteration history before merge.

@nv-alicheng

Copy link
Copy Markdown
Collaborator

Design note — scope over architecture

Independent of the line-level findings already posted, a framing for the rework as a whole. The architecture here is forced by the constraints and the PR gets it right: tokenize is heavy CPU work that must not block the loop, must not contend with the loadgen mid-run, must keep up at 50k+ QPS, and must yield exact final numbers. GIL ⇒ real parallelism needs processes; a single BPE rayon pool saturates ~8 cores; the work is only needed for metrics ⇒ it can defer past the run. Those facts make defer-to-flush batching + process-sharding pinned to disjoint core blocks essentially the only answer, and I'd converge on the same — along with no-silent-fallback and the exact-or-flagged n_pending_tasks contract.

Several subtle calls are better than a first pass would make: the sharding-by-core-block thesis is measured (16k vs 1.5k texts/s), the warmup is a bounded startup error that races the launch budget, shards stay idle until ENDED so tokenization never perturbs the running benchmark, and the DESIGN.md actually documents the contracts. Good work.

The one theme worth weighing is scope, not structure: the live in-process tokenization lane roughly doubles the edge-case surface and accounts for most of the review findings (live-cancel, re-queue-on-failure, the live thread-pool blocking teardown, the bounded per-flush cap). The authoritative metrics are computed at the drain regardless; the live lane exists only to keep token metrics current in the TUI mid-run, and --tokenizer-workers=0 ("defer all") is already the simpler universe.

So if live token metrics are not a hard requirement, a KISS v1 would be drain-only, adding the live lane later if operators ask for live OSL. That removes the live flag threaded through flush/count_texts_async, the re-queue paths, the live-loop lifecycle, and the second executor. If live IS required, the lane as built is a legitimate, well-engineered answer — and the gap then narrows to two refactors:

  1. Make CPU-universe discovery a pure query. The probe currently does getaffinity → expand_to_all_online_cpus() (mutates affinity as a side effect) → restore. The save/probe/restore dance and its restore-failure branch only exist because the helper isn't pure. A pure "list allowed CPUs" query removes the dance and gives a clean cgroup-cpuset fallback when /sys/.../online is unreadable (the affinity finding, addressed at the design level rather than patched).

  2. Split flush() into flush_text + flush_messages. One method juggling a shared failure var, two re-queue paths, and a zip(strict=True) in the else is what produced the data-integrity finding. Two small methods with independent error handling remove that hazard and read better.

None of this is a rework — the bones are right and more thoroughly justified than most perf PRs. It's about trimming optional scope out of v1 and making the two trickiest spots (affinity, the dual-phase flush) simpler.

(Independent design review by Claude at a maintainer's request — not a re-run of the automated council above.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants