Skip to content

feat: add streaming ATOF exporter#172

Closed
OmkarMehta wants to merge 1 commit into
NVIDIA:mainfrom
OmkarMehta:feat/streaming-atof-exporter
Closed

feat: add streaming ATOF exporter#172
OmkarMehta wants to merge 1 commit into
NVIDIA:mainfrom
OmkarMehta:feat/streaming-atof-exporter

Conversation

@OmkarMehta
Copy link
Copy Markdown

@OmkarMehta OmkarMehta commented May 28, 2026

Summary

  • Add a TCP-backed AtofStreamingExporter that connects to a separate local receiver process and streams canonical ATOF JSONL events in real time.
  • Keep socket IO off the synchronous subscriber path by enqueueing serialized events into a bounded queue drained by a dedicated writer thread with a TCP write timeout.
  • Fail setup explicitly if set_nodelay or set_write_timeout(ATOF_STREAM_WRITE_TIMEOUT) cannot be applied, so the exporter never runs without the intended socket behavior.
  • Expose register/deregister helpers so it works through the existing Relay event subscriber path.
  • Document the streaming API as Relay-owned substrate for local viewer/cockpit consumers without putting a product UI into Relay.

Why

Local viewer/cockpit consumers need a stable stream of Relay-native ATOF events without redefining the trace contract or waiting for completed JSONL artifacts. This PR keeps the scope in the Relay substrate: capture/event/export mechanics only, not a product UI.

How

  • AtofStreamingExporterConfig points at a local TCP receiver.
  • AtofStreamingExporter connects to that receiver, configures socket options, and streams one canonical JSON object per newline.
  • Subscriber callbacks serialize and enqueue events, then return quickly. A dedicated writer thread owns socket writes, flushes, shutdown, and stored error propagation.
  • force_flush() and shutdown() use control-message barriers and re-check stored writer errors after the round trip so queued write failures are returned.
  • shutdown() flushes and always attempts to close the stream, even when a stored export error is returned.
  • Tests cover socket delivery, connection failure, socket-configuration error text, first-error preservation, shutdown-after-error EOF behavior, future-events-only runtime registration, and runtime register/deregister integration.

Verification

  • cargo fmt --all --check
  • cargo clippy --workspace --all-targets -- -D warnings
  • cargo test -p nemo-relay observability::atof::tests::streaming_exporter
  • cargo test -p nemo-relay
  • just test-rust
  • just docs

Breaking changes

None. This adds a new exporter surface.

Summary by CodeRabbit

  • New Features

    • Added TCP-based streaming exporter for ATOF events, enabling event forwarding to external receivers over network connections.
    • Introduced configuration, registration/deregistration, flush, and shutdown operations for managing streaming export lifecycle and error handling.
    • Added statistics tracking for event delivery monitoring.
  • Documentation

    • Added streaming API documentation with Rust examples for exporter setup and TCP receiver implementation.

Review Change Stack

@OmkarMehta OmkarMehta requested a review from a team as a code owner May 28, 2026 17:53
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 28, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions Bot added size:M PR is medium lang:rust PR changes/introduces Rust code labels May 28, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a TCP-backed AtofStreamingExporter that streams canonical ATOF JSONL lines over TCP, stream-specific exporter error variants, generalized JSONL write helpers, unit tests for delivery and failure modes, and documentation for the Streaming API.

Changes

TCP-backed ATOF Streaming Exporter

Layer / File(s) Summary
Imports for TCP and concurrency
crates/core/src/observability/atof.rs
Adds networking and concurrency imports (TcpStream, Shutdown, mpsc, JoinHandle, Duration) required by the streaming exporter.
Stream-specific error variants
crates/core/src/observability/atof.rs
Extends AtofExporterError with ConnectStream { address, source }, ConfigureStream { address, operation, timeout, source }, and StoredStreamFailure { address, message }.
Streaming exporter implementation and API
crates/core/src/observability/atof.rs
Adds AtofStreamingExporterConfig, queue/message types, shared writer state, background writer thread, AtofStreamingExporter methods (new/connect/address/subscriber/register/deregister/force_flush/shutdown/stats), bounded mpsc queue, and writer error recording.
JSONL helpers and shared-state stats
crates/core/src/observability/atof.rs
Refactors JSONL helpers into serialize_event and write_serialized_event accepting &mut impl Write; adds shared-state helpers to store/clone last stream error and implements stats() snapshot.
Unit tests and socket sink helpers
crates/core/tests/unit/observability/atof_tests.rs
Adds TEST_RECV_TIMEOUT, two TCP sink helpers (start_atof_socket_sink, start_atof_eof_sink), and tests covering canonical delivery, connect/configure errors, shutdown/stored-error behavior, error precedence, and a runtime integration validating ordering and events_sent.
Streaming API documentation
docs/observability-plugin/atof.mdx
Adds "Streaming API" section documenting AtofStreamingExporter over TCP with a Rust usage example and a minimal TCP receiver example; notes receiver observes only future lifecycle events.
sequenceDiagram
  participant EventSource
  participant AtofStreamingExporter
  participant TcpWriter as WriterThread(BufWriter<TcpStream>)
  participant ReceiverProcess
  EventSource->>AtofStreamingExporter: emit Event (subscriber serializes -> JSON)
  AtofStreamingExporter->>AtofStreamingExporter: enqueue serialized JSON / Flush / Shutdown
  AtofStreamingExporter->>TcpWriter: background thread dequeues and writes JSONL
  TcpWriter->>ReceiverProcess: deliver newline-delimited JSONL over TCP
  ReceiverProcess->>ReceiverProcess: read line, parse to serde_json::Value
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 64.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed The description covers the Overview (Summary/Why/How), implementation Details, verification steps, and breaking changes, though it omits explicit Related Issues section and checkboxes.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title follows Conventional Commits format with allowed type 'feat', uses lowercase, includes a concise imperative summary, is 33 characters (under 72), has no trailing period, and accurately describes the main change: adding a streaming ATOF exporter.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Member

@willkill07 willkill07 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not quite the design I had envisioned. We need separate process exporting, not same process. This means establishing some type of HTTP or socket connection to transmit payloads in real-time.

Also, please adhere to the contributing guide and ensure all commits are signed-off

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/observability-plugin/atof.mdx`:
- Around line 252-254: The receive loop exits on the first timeout; change it to
handle timeout by continuing the loop and only break on disconnection or error.
Replace the while-let pattern around
receiver.recv_timeout(Duration::from_millis(100)) with a loop that matches the
Result from receiver.recv_timeout, printing serde_json::to_string(&event)? on
Ok(event), continuing on RecvTimeoutError::Timeout (to allow idle gaps), and
breaking/logging on RecvTimeoutError::Disconnected or other Err variants so the
stream remains robust to short idle periods.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: f61590da-b947-438c-a7b6-f418bc2720f8

📥 Commits

Reviewing files that changed from the base of the PR and between 2d9b4f4 and 3043ee1.

📒 Files selected for processing (3)
  • crates/core/src/observability/atof.rs
  • crates/core/tests/unit/observability/atof_tests.rs
  • docs/observability-plugin/atof.mdx
📜 Review details
🧰 Additional context used
📓 Path-based instructions (21)
{docs/**,README.md,CONTRIBUTING.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

{docs/**,README.md,CONTRIBUTING.md}: For docs-only changes, run targeted checks only if commands, package names, or examples changed. Use just docs for docs-site builds and just docs-linkcheck when links changed
Run docs site build with just docs

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Run docs link validation with just docs-linkcheck when links change

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify README and docs entry points still match current package names and paths for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,examples/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify examples still run with documented commands for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,**/Cargo.toml,**/package.json,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Ensure renamed public surfaces are reflected consistently in manifests and docs for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
**/*.{md,mdx,py,sh,yaml,yml,toml,json}

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

Keep package names, repo references, and build commands current

Files:

  • docs/observability-plugin/atof.mdx
**/*.mdx

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

In MDX files, top-of-file comments must use JSX comment delimiters: {/* to open and */} to close. Do not use HTML comments for MDX SPDX headers.

MDX top-of-file SPDX comments must use {/* ... */} delimiters instead of HTML comment delimiters (Must-Fix)

Files:

  • docs/observability-plugin/atof.mdx
**/*.{html,md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Include SPDX license header in HTML and Markdown files using HTML comment syntax

Files:

  • docs/observability-plugin/atof.mdx
docs/**/*.{md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Update embedded documentation snippets, patch docs, and binding-support notes if examples or supported bindings changed

Files:

  • docs/observability-plugin/atof.mdx
docs/**

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Run just docs or ./scripts/build-docs.sh html to regenerate ignored Fern API reference pages before validation for documentation site changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}

⚙️ CodeRabbit configuration file

{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}: Review documentation for technical accuracy against the current API, command correctness, and consistency across language bindings.
Flag stale examples, missing SPDX headers where required, and instructions that no longer match CI or pre-commit behavior.

Files:

  • docs/observability-plugin/atof.mdx
**/*.rs

📄 CodeRabbit inference engine (.agents/skills/add-binding-feature/SKILL.md)

Use snake_case naming convention for Rust identifiers (e.g., nemo_relay_tool_call)

**/*.rs: Any Rust change must run just test-rust
Any Rust change must run cargo fmt --all
Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all for all FFI work since it is Rust work
Run just test-rust to validate FFI changes
Run cargo clippy --workspace --all-targets -- -D warnings to enforce strict linting on FFI work

When Rust files changed as part of Go work, also run cargo fmt --all, just test-rust, and cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all when Rust files are changed as part of Node work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files are changed as part of Node work
Run just test-rust when Rust files are changed as part of Node work

**/*.rs: Run cargo fmt --all to format all Rust code
Run cargo clippy --workspace --all-targets -- -D warnings to enforce all clippy lints as errors

**/*.rs: Run cargo fmt --all when Rust files changed as part of WebAssembly work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files changed as part of WebAssembly work

**/*.rs: If any Rust code changed, always run just test-rust
If any Rust code changed, also run cargo fmt --all
If any Rust code changed, also run cargo clippy --workspace --all-targets -- -D warnings
Run Rust formatting with cargo fmt --all
Run Rust linting with cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Keep SPDX headers on Rust source files. The project is Apache-2.0.
Use snake_case for Rust binding naming conventions.
Use Json = serde_json::Value in Rust-facing runtime APIs where the existing code expects JSON payloads.
Use Result<T> with FlowError in core runtime paths. Keep errors explicit and binding-appropriate at the wrapper layer.
Preserve async behavior on the existing tokio-based model i...

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/adaptive/**/*.rs,**/*test*.{rs,py,go,ts,js},**/*adaptive*test*.{rs,py,go,ts,js},docs/plugins/adaptive/**}

📄 CodeRabbit inference engine (.agents/skills/maintain-optimizer/SKILL.md)

Maintain documented and tested validation and report behavior for adaptive surfaces

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
**/{Cargo.toml,**/*.rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Maintain consistency between Rust package names in Cargo.toml and their actual usage across the codebase

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{h,hpp,c,cpp,rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Ensure FFI header and library naming follows consistent conventions across platform-specific builds

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/core,crates/adaptive}/**/*

📄 CodeRabbit inference engine (.agents/skills/prepare-pr/SKILL.md)

Changes to crates/core or crates/adaptive must run the full language matrix

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{rs,toml}

📄 CodeRabbit inference engine (.agents/skills/rename-surfaces/SKILL.md)

Update Rust crate names and module prefixes during coordinated rename operations

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/core/**/*.rs

📄 CodeRabbit inference engine (.agents/skills/test-go-binding/SKILL.md)

If the change touched crates/core or shared runtime semantics, also use validate-change for broader validation

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**/*.rs

⚙️ CodeRabbit configuration file

crates/{core,adaptive}/**/*.rs: Review the Rust runtime for async correctness, scope isolation, middleware ordering, and event lifecycle regressions.
Pay close attention to task-local/thread-local scope propagation, callback lifetimes, stream finalization, and root_uuid isolation.
Public API changes should preserve existing behavior unless tests and docs show the intended migration path.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}

⚙️ CodeRabbit configuration file

{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}: Tests should cover the behavior promised by the changed API surface, including error paths and cross-request isolation where relevant.
Prefer assertions on lifecycle events, scope stacks, middleware ordering, and binding parity over shallow smoke tests.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
🔇 Additional comments (2)
crates/core/src/observability/atof.rs (1)

14-19: LGTM!

Also applies to: 230-334

crates/core/tests/unit/observability/atof_tests.rs (1)

17-17: LGTM!

Also applies to: 222-309

Comment thread docs/observability-plugin/atof.mdx Outdated
Comment on lines +252 to +254
while let Ok(event) = receiver.recv_timeout(Duration::from_millis(100)) {
println!("{}", serde_json::to_string(&event)?);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Make the receive loop robust to idle gaps.

This loop exits on the first timeout, so it can stop streaming even when more events are expected shortly after.

Proposed doc fix
-while let Ok(event) = receiver.recv_timeout(Duration::from_millis(100)) {
-    println!("{}", serde_json::to_string(&event)?);
-}
+loop {
+    match receiver.recv_timeout(Duration::from_millis(100)) {
+        Ok(event) => println!("{}", serde_json::to_string(&event)?),
+        Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
+        Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
+    }
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/observability-plugin/atof.mdx` around lines 252 - 254, The receive loop
exits on the first timeout; change it to handle timeout by continuing the loop
and only break on disconnection or error. Replace the while-let pattern around
receiver.recv_timeout(Duration::from_millis(100)) with a loop that matches the
Result from receiver.recv_timeout, printing serde_json::to_string(&event)? on
Ok(event), continuing on RecvTimeoutError::Timeout (to allow idle gaps), and
breaking/logging on RecvTimeoutError::Disconnected or other Err variants so the
stream remains robust to short idle periods.

@OmkarMehta OmkarMehta force-pushed the feat/streaming-atof-exporter branch from 3043ee1 to 01d7bc2 Compare May 28, 2026 18:09
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/core/src/observability/atof.rs (1)

1-3: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Run the required Rust validation matrix for this PR.

The PR notes only mention cargo fmt --package nemo-relay and cargo test -p nemo-relay, but this Rust change set requires the full mandated checks before merge.

As per coding guidelines, “If any Rust code changed, always run just test-rust”, “also run cargo fmt --all”, and “also run cargo clippy --workspace --all-targets -- -D warnings”.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/core/src/observability/atof.rs` around lines 1 - 3, This PR touched
Rust sources (e.g., crates/core/src/observability/atof.rs) but the author only
ran package-scoped checks; re-run the full mandated Rust validation locally and
update the PR with results: run `just test-rust`, then `cargo fmt --all` (fix
formatting and stage changes), and run `cargo clippy --workspace --all-targets
-- -D warnings` (fix any clippy warnings), then re-run `cargo test` to confirm;
include the resulting CI/command output or pushed fixes in the PR so the full
validation matrix is satisfied.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/core/src/observability/atof.rs`:
- Around line 377-385: shutdown() currently returns early on self.force_flush()?
so the TCP connection isn't closed when flush fails; change it to call
self.force_flush() and capture any error into a local variable (e.g.,
flush_err), then always proceed to lock self.state and call
state.writer.get_ref().shutdown(Shutdown::Both) (handling the lock with the
existing AtofExporterError::LockPoisoned mapping), and finally return the
original flush error if present or Ok(()) otherwise; reference the shutdown()
method, force_flush(), state.writer.get_ref().shutdown(Shutdown::Both), and
AtofExporterError::LockPoisoned to locate the code to modify.

In `@crates/core/tests/unit/observability/atof_tests.rs`:
- Line 255: The recv_timeout in the unit test uses a very short 250ms window
which causes flaky CI; update the calls to
receiver.recv_timeout(Duration::from_millis(250)) in atof_tests.rs (and the
other occurrence around the referenced line ~312) to use a larger, hardened
timeout (e.g., Duration::from_secs(2) or a named constant like
TEST_RECV_TIMEOUT) so cross-thread TCP delays in CI are tolerated; make the same
change for every receiver.recv_timeout call in that test file.

---

Outside diff comments:
In `@crates/core/src/observability/atof.rs`:
- Around line 1-3: This PR touched Rust sources (e.g.,
crates/core/src/observability/atof.rs) but the author only ran package-scoped
checks; re-run the full mandated Rust validation locally and update the PR with
results: run `just test-rust`, then `cargo fmt --all` (fix formatting and stage
changes), and run `cargo clippy --workspace --all-targets -- -D warnings` (fix
any clippy warnings), then re-run `cargo test` to confirm; include the resulting
CI/command output or pushed fixes in the PR so the full validation matrix is
satisfied.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: d8c6d3d1-4d51-4267-a46f-b7721733eec6

📥 Commits

Reviewing files that changed from the base of the PR and between 3043ee1 and 01d7bc2.

📒 Files selected for processing (3)
  • crates/core/src/observability/atof.rs
  • crates/core/tests/unit/observability/atof_tests.rs
  • docs/observability-plugin/atof.mdx
📜 Review details
🧰 Additional context used
📓 Path-based instructions (21)
**/*.rs

📄 CodeRabbit inference engine (.agents/skills/add-binding-feature/SKILL.md)

Use snake_case naming convention for Rust identifiers (e.g., nemo_relay_tool_call)

**/*.rs: Any Rust change must run just test-rust
Any Rust change must run cargo fmt --all
Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all for all FFI work since it is Rust work
Run just test-rust to validate FFI changes
Run cargo clippy --workspace --all-targets -- -D warnings to enforce strict linting on FFI work

When Rust files changed as part of Go work, also run cargo fmt --all, just test-rust, and cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all when Rust files are changed as part of Node work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files are changed as part of Node work
Run just test-rust when Rust files are changed as part of Node work

**/*.rs: Run cargo fmt --all to format all Rust code
Run cargo clippy --workspace --all-targets -- -D warnings to enforce all clippy lints as errors

**/*.rs: Run cargo fmt --all when Rust files changed as part of WebAssembly work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files changed as part of WebAssembly work

**/*.rs: If any Rust code changed, always run just test-rust
If any Rust code changed, also run cargo fmt --all
If any Rust code changed, also run cargo clippy --workspace --all-targets -- -D warnings
Run Rust formatting with cargo fmt --all
Run Rust linting with cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Keep SPDX headers on Rust source files. The project is Apache-2.0.
Use snake_case for Rust binding naming conventions.
Use Json = serde_json::Value in Rust-facing runtime APIs where the existing code expects JSON payloads.
Use Result<T> with FlowError in core runtime paths. Keep errors explicit and binding-appropriate at the wrapper layer.
Preserve async behavior on the existing tokio-based model i...

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/adaptive/**/*.rs,**/*test*.{rs,py,go,ts,js},**/*adaptive*test*.{rs,py,go,ts,js},docs/plugins/adaptive/**}

📄 CodeRabbit inference engine (.agents/skills/maintain-optimizer/SKILL.md)

Maintain documented and tested validation and report behavior for adaptive surfaces

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
**/{Cargo.toml,**/*.rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Maintain consistency between Rust package names in Cargo.toml and their actual usage across the codebase

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{h,hpp,c,cpp,rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Ensure FFI header and library naming follows consistent conventions across platform-specific builds

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/core,crates/adaptive}/**/*

📄 CodeRabbit inference engine (.agents/skills/prepare-pr/SKILL.md)

Changes to crates/core or crates/adaptive must run the full language matrix

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{rs,toml}

📄 CodeRabbit inference engine (.agents/skills/rename-surfaces/SKILL.md)

Update Rust crate names and module prefixes during coordinated rename operations

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/core/**/*.rs

📄 CodeRabbit inference engine (.agents/skills/test-go-binding/SKILL.md)

If the change touched crates/core or shared runtime semantics, also use validate-change for broader validation

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**/*.rs

⚙️ CodeRabbit configuration file

crates/{core,adaptive}/**/*.rs: Review the Rust runtime for async correctness, scope isolation, middleware ordering, and event lifecycle regressions.
Pay close attention to task-local/thread-local scope propagation, callback lifetimes, stream finalization, and root_uuid isolation.
Public API changes should preserve existing behavior unless tests and docs show the intended migration path.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}

⚙️ CodeRabbit configuration file

{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}: Tests should cover the behavior promised by the changed API surface, including error paths and cross-request isolation where relevant.
Prefer assertions on lifecycle events, scope stacks, middleware ordering, and binding parity over shallow smoke tests.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
{docs/**,README.md,CONTRIBUTING.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

{docs/**,README.md,CONTRIBUTING.md}: For docs-only changes, run targeted checks only if commands, package names, or examples changed. Use just docs for docs-site builds and just docs-linkcheck when links changed
Run docs site build with just docs

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Run docs link validation with just docs-linkcheck when links change

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify README and docs entry points still match current package names and paths for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,examples/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify examples still run with documented commands for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,**/Cargo.toml,**/package.json,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Ensure renamed public surfaces are reflected consistently in manifests and docs for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
**/*.{md,mdx,py,sh,yaml,yml,toml,json}

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

Keep package names, repo references, and build commands current

Files:

  • docs/observability-plugin/atof.mdx
**/*.mdx

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

In MDX files, top-of-file comments must use JSX comment delimiters: {/* to open and */} to close. Do not use HTML comments for MDX SPDX headers.

MDX top-of-file SPDX comments must use {/* ... */} delimiters instead of HTML comment delimiters (Must-Fix)

Files:

  • docs/observability-plugin/atof.mdx
**/*.{html,md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Include SPDX license header in HTML and Markdown files using HTML comment syntax

Files:

  • docs/observability-plugin/atof.mdx
docs/**/*.{md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Update embedded documentation snippets, patch docs, and binding-support notes if examples or supported bindings changed

Files:

  • docs/observability-plugin/atof.mdx
docs/**

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Run just docs or ./scripts/build-docs.sh html to regenerate ignored Fern API reference pages before validation for documentation site changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}

⚙️ CodeRabbit configuration file

{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}: Review documentation for technical accuracy against the current API, command correctness, and consistency across language bindings.
Flag stale examples, missing SPDX headers where required, and instructions that no longer match CI or pre-commit behavior.

Files:

  • docs/observability-plugin/atof.mdx
🔇 Additional comments (1)
docs/observability-plugin/atof.mdx (1)

233-263: LGTM!

Comment thread crates/core/src/observability/atof.rs Outdated
Comment thread crates/core/tests/unit/observability/atof_tests.rs Outdated
@OmkarMehta OmkarMehta force-pushed the feat/streaming-atof-exporter branch from 01d7bc2 to 665eabb Compare May 28, 2026 18:22
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/core/src/observability/atof.rs`:
- Around line 253-407: This PR touches crates/core (e.g., AtofStreamingExporter
in AtofStreamingExporterConfig/AtofStreamingExporterState), so run the full
required Rust and cross-language validation: run `just test-rust`, `cargo fmt
--all`, and `cargo clippy --workspace --all-targets -- -D warnings`; also run
the full cross-language matrix required for crates/core (Rust, Python, Go,
Node.js, WebAssembly) and update the PR description to list these verification
steps and their results before merge.
- Around line 327-342: The subscriber closure in AtofStreamingExporter (the
subscriber() function) is performing blocking per-event writes via
write_event(&mut state.writer, event) which calls write_all + flush and can
stall event delivery; instead, move IO off the synchronous subscriber path by
enqueuing serialized events into a bounded in-memory buffer/queue (owned by
state) and spawn a dedicated writer task/thread that drains that queue and
performs write_all/flush with a socket write timeout or non-blocking mode;
additionally set a socket write timeout (set_write_timeout) or use non-blocking
writes with retry/backoff in the writer task and record errors into
state.last_error; update subscriber() to push into the queue and return
immediately rather than calling write_event directly.

In `@docs/observability-plugin/atof.mdx`:
- Around line 233-263: This docs change adds a new Streaming API example for
AtofStreamingExporter in docs/observability-plugin/atof.mdx; run the docs-site
validation and regenerate ignored Fern API reference pages before merging by
executing the project's docs build (either just docs or ./scripts/build-docs.sh
html) and, if any links were modified, run just docs-linkcheck; ensure the
generated site builds cleanly and that the example compiles/links correctly in
the API reference for AtofStreamingExporter/AtofStreamingExporterConfig.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: 1bbb4c67-4ed5-4ee3-82c8-dc76a3a4b517

📥 Commits

Reviewing files that changed from the base of the PR and between 01d7bc2 and 665eabb.

📒 Files selected for processing (3)
  • crates/core/src/observability/atof.rs
  • crates/core/tests/unit/observability/atof_tests.rs
  • docs/observability-plugin/atof.mdx
📜 Review details
🧰 Additional context used
📓 Path-based instructions (21)
{docs/**,README.md,CONTRIBUTING.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

{docs/**,README.md,CONTRIBUTING.md}: For docs-only changes, run targeted checks only if commands, package names, or examples changed. Use just docs for docs-site builds and just docs-linkcheck when links changed
Run docs site build with just docs

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Run docs link validation with just docs-linkcheck when links change

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify README and docs entry points still match current package names and paths for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,examples/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify examples still run with documented commands for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,**/Cargo.toml,**/package.json,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Ensure renamed public surfaces are reflected consistently in manifests and docs for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
**/*.{md,mdx,py,sh,yaml,yml,toml,json}

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

Keep package names, repo references, and build commands current

Files:

  • docs/observability-plugin/atof.mdx
**/*.mdx

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

In MDX files, top-of-file comments must use JSX comment delimiters: {/* to open and */} to close. Do not use HTML comments for MDX SPDX headers.

MDX top-of-file SPDX comments must use {/* ... */} delimiters instead of HTML comment delimiters (Must-Fix)

Files:

  • docs/observability-plugin/atof.mdx
**/*.{html,md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Include SPDX license header in HTML and Markdown files using HTML comment syntax

Files:

  • docs/observability-plugin/atof.mdx
docs/**/*.{md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Update embedded documentation snippets, patch docs, and binding-support notes if examples or supported bindings changed

Files:

  • docs/observability-plugin/atof.mdx
docs/**

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Run just docs or ./scripts/build-docs.sh html to regenerate ignored Fern API reference pages before validation for documentation site changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}

⚙️ CodeRabbit configuration file

{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}: Review documentation for technical accuracy against the current API, command correctness, and consistency across language bindings.
Flag stale examples, missing SPDX headers where required, and instructions that no longer match CI or pre-commit behavior.

Files:

  • docs/observability-plugin/atof.mdx
**/*.rs

📄 CodeRabbit inference engine (.agents/skills/add-binding-feature/SKILL.md)

Use snake_case naming convention for Rust identifiers (e.g., nemo_relay_tool_call)

**/*.rs: Any Rust change must run just test-rust
Any Rust change must run cargo fmt --all
Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all for all FFI work since it is Rust work
Run just test-rust to validate FFI changes
Run cargo clippy --workspace --all-targets -- -D warnings to enforce strict linting on FFI work

When Rust files changed as part of Go work, also run cargo fmt --all, just test-rust, and cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all when Rust files are changed as part of Node work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files are changed as part of Node work
Run just test-rust when Rust files are changed as part of Node work

**/*.rs: Run cargo fmt --all to format all Rust code
Run cargo clippy --workspace --all-targets -- -D warnings to enforce all clippy lints as errors

**/*.rs: Run cargo fmt --all when Rust files changed as part of WebAssembly work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files changed as part of WebAssembly work

**/*.rs: If any Rust code changed, always run just test-rust
If any Rust code changed, also run cargo fmt --all
If any Rust code changed, also run cargo clippy --workspace --all-targets -- -D warnings
Run Rust formatting with cargo fmt --all
Run Rust linting with cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Keep SPDX headers on Rust source files. The project is Apache-2.0.
Use snake_case for Rust binding naming conventions.
Use Json = serde_json::Value in Rust-facing runtime APIs where the existing code expects JSON payloads.
Use Result<T> with FlowError in core runtime paths. Keep errors explicit and binding-appropriate at the wrapper layer.
Preserve async behavior on the existing tokio-based model i...

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/adaptive/**/*.rs,**/*test*.{rs,py,go,ts,js},**/*adaptive*test*.{rs,py,go,ts,js},docs/plugins/adaptive/**}

📄 CodeRabbit inference engine (.agents/skills/maintain-optimizer/SKILL.md)

Maintain documented and tested validation and report behavior for adaptive surfaces

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
**/{Cargo.toml,**/*.rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Maintain consistency between Rust package names in Cargo.toml and their actual usage across the codebase

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{h,hpp,c,cpp,rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Ensure FFI header and library naming follows consistent conventions across platform-specific builds

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/core,crates/adaptive}/**/*

📄 CodeRabbit inference engine (.agents/skills/prepare-pr/SKILL.md)

Changes to crates/core or crates/adaptive must run the full language matrix

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{rs,toml}

📄 CodeRabbit inference engine (.agents/skills/rename-surfaces/SKILL.md)

Update Rust crate names and module prefixes during coordinated rename operations

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/core/**/*.rs

📄 CodeRabbit inference engine (.agents/skills/test-go-binding/SKILL.md)

If the change touched crates/core or shared runtime semantics, also use validate-change for broader validation

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**/*.rs

⚙️ CodeRabbit configuration file

crates/{core,adaptive}/**/*.rs: Review the Rust runtime for async correctness, scope isolation, middleware ordering, and event lifecycle regressions.
Pay close attention to task-local/thread-local scope propagation, callback lifetimes, stream finalization, and root_uuid isolation.
Public API changes should preserve existing behavior unless tests and docs show the intended migration path.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}

⚙️ CodeRabbit configuration file

{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}: Tests should cover the behavior promised by the changed API surface, including error paths and cross-request isolation where relevant.
Prefer assertions on lifecycle events, scope stacks, middleware ordering, and binding parity over shallow smoke tests.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
🔇 Additional comments (1)
crates/core/tests/unit/observability/atof_tests.rs (1)

25-25: LGTM!

Also applies to: 121-153, 262-355

Comment on lines +253 to +407
/// Configuration for [`AtofStreamingExporter`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AtofStreamingExporterConfig {
/// TCP address for a separate local process that receives ATOF JSONL events.
pub address: String,
}

impl AtofStreamingExporterConfig {
/// Create a streaming exporter config for the given TCP address.
pub fn new(address: impl Into<String>) -> Self {
Self {
address: address.into(),
}
}
}

struct AtofStreamingExporterState {
writer: BufWriter<TcpStream>,
events_sent: u64,
last_error: Option<String>,
}

/// Snapshot of [`AtofStreamingExporter`] delivery state.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct AtofStreamingExporterStats {
/// Number of ATOF events observed by the streaming exporter.
pub events_sent: u64,
/// Most recent serialization or exporter state error, if one was recorded.
pub last_error: Option<String>,
}

/// TCP-backed Agent Trajectory Observability Format (ATOF) event stream exporter.
///
/// The exporter exposes a regular NeMo Relay event subscriber and writes each
/// canonical ATOF JSON value as one JSONL line to a separate local process over
/// a TCP connection. A local UI, CLI, or bridge process can own the receiving
/// socket and fan events out over HTTP, SSE, WebSocket, stdout, or another
/// transport without redefining the ATOF event contract.
#[derive(Clone)]
pub struct AtofStreamingExporter {
address: String,
state: Arc<Mutex<AtofStreamingExporterState>>,
}

impl AtofStreamingExporter {
/// Connect to a separate local ATOF stream receiver.
pub fn new(config: AtofStreamingExporterConfig) -> Result<Self> {
let address = config.address;
let stream =
TcpStream::connect(&address).map_err(|source| AtofExporterError::ConnectStream {
address: address.clone(),
source,
})?;
let _ = stream.set_nodelay(true);
Ok(Self {
address,
state: Arc::new(Mutex::new(AtofStreamingExporterState {
writer: BufWriter::new(stream),
events_sent: 0,
last_error: None,
})),
})
}

/// Connect to a separate local ATOF stream receiver at the given TCP address.
pub fn connect(address: impl Into<String>) -> Result<Self> {
Self::new(AtofStreamingExporterConfig::new(address))
}

/// Return the connected stream receiver address.
pub fn address(&self) -> &str {
&self.address
}

/// Return an event subscriber that writes one canonical JSONL record per event.
pub fn subscriber(&self) -> EventSubscriberFn {
let state = Arc::clone(&self.state);
Arc::new(move |event: &Event| {
let Ok(mut state) = state.lock() else {
return;
};
if state.last_error.is_some() {
return;
}
if let Err(error) = write_event(&mut state.writer, event) {
state.last_error = Some(error);
return;
}
state.events_sent += 1;
})
}

/// Register this streaming exporter globally under the given subscriber name.
pub fn register(&self, name: &str) -> Result<()> {
register_subscriber(name, self.subscriber()).map_err(Into::into)
}

/// Deregister a global subscriber by name.
pub fn deregister(&self, name: &str) -> Result<bool> {
deregister_subscriber(name).map_err(Into::into)
}

/// Flush the stream and report any stored write error.
pub fn force_flush(&self) -> Result<()> {
let mut state = self
.state
.lock()
.map_err(|_| AtofExporterError::LockPoisoned)?;
state
.writer
.flush()
.map_err(|source| AtofExporterError::FlushStream {
address: self.address.clone(),
source,
})?;
if let Some(message) = &state.last_error {
return Err(AtofExporterError::StoredStreamFailure {
address: self.address.clone(),
message: message.clone(),
});
}
Ok(())
}

/// Shut down the stream by flushing and closing the TCP connection.
pub fn shutdown(&self) -> Result<()> {
let flush_result = self.force_flush();
let shutdown_result = self
.state
.lock()
.map_err(|_| AtofExporterError::LockPoisoned)
.map(|state| {
let _ = state.writer.get_ref().shutdown(Shutdown::Both);
});
match (flush_result, shutdown_result) {
(Err(error), _) => Err(error),
(Ok(()), Err(error)) => Err(error),
(Ok(()), Ok(())) => Ok(()),
}
}

/// Return a point-in-time delivery snapshot for diagnostics and tests.
pub fn stats(&self) -> AtofStreamingExporterStats {
let Ok(state) = self.state.lock() else {
return AtofStreamingExporterStats {
last_error: Some("the ATOF streaming exporter state lock was poisoned".to_string()),
..AtofStreamingExporterStats::default()
};
};
AtofStreamingExporterStats {
events_sent: state.events_sent,
last_error: state.last_error.clone(),
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Run the required Rust/core validation matrix before merge.

The recorded verification only mentions cargo fmt --package nemo-relay and cargo test -p nemo-relay; this change set touches crates/core and needs the stricter required checks.

As per coding guidelines "Any Rust change must run just test-rust", "Any Rust change must run cargo fmt --all", "Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings", and "crates/{core,adaptive}/**: If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/core/src/observability/atof.rs` around lines 253 - 407, This PR
touches crates/core (e.g., AtofStreamingExporter in
AtofStreamingExporterConfig/AtofStreamingExporterState), so run the full
required Rust and cross-language validation: run `just test-rust`, `cargo fmt
--all`, and `cargo clippy --workspace --all-targets -- -D warnings`; also run
the full cross-language matrix required for crates/core (Rust, Python, Go,
Node.js, WebAssembly) and update the PR description to list these verification
steps and their results before merge.

Comment thread crates/core/src/observability/atof.rs
Comment thread docs/observability-plugin/atof.mdx
@OmkarMehta OmkarMehta force-pushed the feat/streaming-atof-exporter branch from 665eabb to df40fcf Compare May 28, 2026 18:42
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/core/src/observability/atof.rs`:
- Around line 565-568: The store_stream_error function currently always replaces
the stored error; change it to preserve the first error by only setting the
Option<String> when it is None (e.g., acquire the Arc<Mutex<Option<String>>>
lock in store_stream_error and call get_or_insert or check is_none before
assigning), so subsequent failures do not overwrite the initially recorded error
(target the function named store_stream_error and the Arc<Mutex<Option<String>>>
state).
- Around line 59-66: The FlushStream error variant in the AtofExporterError enum
is never constructed; either remove it or wire it into the flush path—simplest
fix is to delete the FlushStream variant from
crates/core/src/observability/atof.rs and any formatting/display uses, since
flush failures are already captured and returned as
AtofExporterError::StoredStreamFailure by
AtofStreamMessage::Flush/AtofStreamMessage::Shutdown through
force_flush()/shutdown(); after removing, run cargo build/tests to ensure no
remaining references to FlushStream remain and update any pattern matches that
referenced it to handle StoredStreamFailure instead.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: 8a1a5ebb-c39a-43c3-abeb-f249eca137c2

📥 Commits

Reviewing files that changed from the base of the PR and between 665eabb and df40fcf.

📒 Files selected for processing (3)
  • crates/core/src/observability/atof.rs
  • crates/core/tests/unit/observability/atof_tests.rs
  • docs/observability-plugin/atof.mdx
📜 Review details
🧰 Additional context used
📓 Path-based instructions (21)
{docs/**,README.md,CONTRIBUTING.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

{docs/**,README.md,CONTRIBUTING.md}: For docs-only changes, run targeted checks only if commands, package names, or examples changed. Use just docs for docs-site builds and just docs-linkcheck when links changed
Run docs site build with just docs

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Run docs link validation with just docs-linkcheck when links change

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify README and docs entry points still match current package names and paths for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,examples/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify examples still run with documented commands for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,**/Cargo.toml,**/package.json,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Ensure renamed public surfaces are reflected consistently in manifests and docs for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
**/*.{md,mdx,py,sh,yaml,yml,toml,json}

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

Keep package names, repo references, and build commands current

Files:

  • docs/observability-plugin/atof.mdx
**/*.mdx

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

In MDX files, top-of-file comments must use JSX comment delimiters: {/* to open and */} to close. Do not use HTML comments for MDX SPDX headers.

MDX top-of-file SPDX comments must use {/* ... */} delimiters instead of HTML comment delimiters (Must-Fix)

Files:

  • docs/observability-plugin/atof.mdx
**/*.{html,md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Include SPDX license header in HTML and Markdown files using HTML comment syntax

Files:

  • docs/observability-plugin/atof.mdx
docs/**/*.{md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Update embedded documentation snippets, patch docs, and binding-support notes if examples or supported bindings changed

Files:

  • docs/observability-plugin/atof.mdx
docs/**

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Run just docs or ./scripts/build-docs.sh html to regenerate ignored Fern API reference pages before validation for documentation site changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}

⚙️ CodeRabbit configuration file

{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}: Review documentation for technical accuracy against the current API, command correctness, and consistency across language bindings.
Flag stale examples, missing SPDX headers where required, and instructions that no longer match CI or pre-commit behavior.

Files:

  • docs/observability-plugin/atof.mdx
**/*.rs

📄 CodeRabbit inference engine (.agents/skills/add-binding-feature/SKILL.md)

Use snake_case naming convention for Rust identifiers (e.g., nemo_relay_tool_call)

**/*.rs: Any Rust change must run just test-rust
Any Rust change must run cargo fmt --all
Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all for all FFI work since it is Rust work
Run just test-rust to validate FFI changes
Run cargo clippy --workspace --all-targets -- -D warnings to enforce strict linting on FFI work

When Rust files changed as part of Go work, also run cargo fmt --all, just test-rust, and cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all when Rust files are changed as part of Node work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files are changed as part of Node work
Run just test-rust when Rust files are changed as part of Node work

**/*.rs: Run cargo fmt --all to format all Rust code
Run cargo clippy --workspace --all-targets -- -D warnings to enforce all clippy lints as errors

**/*.rs: Run cargo fmt --all when Rust files changed as part of WebAssembly work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files changed as part of WebAssembly work

**/*.rs: If any Rust code changed, always run just test-rust
If any Rust code changed, also run cargo fmt --all
If any Rust code changed, also run cargo clippy --workspace --all-targets -- -D warnings
Run Rust formatting with cargo fmt --all
Run Rust linting with cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Keep SPDX headers on Rust source files. The project is Apache-2.0.
Use snake_case for Rust binding naming conventions.
Use Json = serde_json::Value in Rust-facing runtime APIs where the existing code expects JSON payloads.
Use Result<T> with FlowError in core runtime paths. Keep errors explicit and binding-appropriate at the wrapper layer.
Preserve async behavior on the existing tokio-based model i...

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/adaptive/**/*.rs,**/*test*.{rs,py,go,ts,js},**/*adaptive*test*.{rs,py,go,ts,js},docs/plugins/adaptive/**}

📄 CodeRabbit inference engine (.agents/skills/maintain-optimizer/SKILL.md)

Maintain documented and tested validation and report behavior for adaptive surfaces

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
**/{Cargo.toml,**/*.rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Maintain consistency between Rust package names in Cargo.toml and their actual usage across the codebase

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{h,hpp,c,cpp,rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Ensure FFI header and library naming follows consistent conventions across platform-specific builds

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/core,crates/adaptive}/**/*

📄 CodeRabbit inference engine (.agents/skills/prepare-pr/SKILL.md)

Changes to crates/core or crates/adaptive must run the full language matrix

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{rs,toml}

📄 CodeRabbit inference engine (.agents/skills/rename-surfaces/SKILL.md)

Update Rust crate names and module prefixes during coordinated rename operations

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/core/**/*.rs

📄 CodeRabbit inference engine (.agents/skills/test-go-binding/SKILL.md)

If the change touched crates/core or shared runtime semantics, also use validate-change for broader validation

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**/*.rs

⚙️ CodeRabbit configuration file

crates/{core,adaptive}/**/*.rs: Review the Rust runtime for async correctness, scope isolation, middleware ordering, and event lifecycle regressions.
Pay close attention to task-local/thread-local scope propagation, callback lifetimes, stream finalization, and root_uuid isolation.
Public API changes should preserve existing behavior unless tests and docs show the intended migration path.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}

⚙️ CodeRabbit configuration file

{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}: Tests should cover the behavior promised by the changed API surface, including error paths and cross-request isolation where relevant.
Prefer assertions on lifecycle events, scope stacks, middleware ordering, and binding parity over shallow smoke tests.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
🔇 Additional comments (10)
crates/core/tests/unit/observability/atof_tests.rs (1)

16-21: LGTM!

Also applies to: 25-25, 121-153, 262-356

docs/observability-plugin/atof.mdx (1)

233-263: LGTM!

crates/core/src/observability/atof.rs (8)

13-17: LGTM!


255-269: LGTM!


271-285: LGTM!


287-294: LGTM!


296-360: LGTM!


372-410: LGTM!


460-503: LGTM!


505-518: LGTM!

Comment thread crates/core/src/observability/atof.rs Outdated
Comment thread crates/core/src/observability/atof.rs
@OmkarMehta OmkarMehta force-pushed the feat/streaming-atof-exporter branch from df40fcf to 4e08e6c Compare May 28, 2026 18:49
@github-actions github-actions Bot added size:L PR is large and removed size:M PR is medium labels May 28, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/core/src/observability/atof.rs`:
- Around line 318-319: Currently set_nodelay and set_write_timeout on the
TcpStream are called and their Results are discarded; change this to handle
failures instead of ignoring them: replace the silent lets with explicit error
handling (e.g., if let Err(e) = stream.set_nodelay(true) { /* log error with
context and stream id */ return Err(e.into())/close stream } and same for
stream.set_write_timeout(Some(ATOF_STREAM_WRITE_TIMEOUT)) so that a failure to
set timeouts is logged (use the crate's logging/tracing facility) and propagated
or aborts setup to avoid leaving the writer thread running without write
timeouts under backpressure. Ensure the error messages mention set_nodelay,
set_write_timeout and ATOF_STREAM_WRITE_TIMEOUT so they are easy to find in
logs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: b13870d3-f62e-45e0-af06-edb3b5f904f6

📥 Commits

Reviewing files that changed from the base of the PR and between df40fcf and 4e08e6c.

📒 Files selected for processing (3)
  • crates/core/src/observability/atof.rs
  • crates/core/tests/unit/observability/atof_tests.rs
  • docs/observability-plugin/atof.mdx
📜 Review details
🧰 Additional context used
📓 Path-based instructions (21)
{docs/**,README.md,CONTRIBUTING.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

{docs/**,README.md,CONTRIBUTING.md}: For docs-only changes, run targeted checks only if commands, package names, or examples changed. Use just docs for docs-site builds and just docs-linkcheck when links changed
Run docs site build with just docs

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Run docs link validation with just docs-linkcheck when links change

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify README and docs entry points still match current package names and paths for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,examples/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify examples still run with documented commands for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,**/Cargo.toml,**/package.json,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Ensure renamed public surfaces are reflected consistently in manifests and docs for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
**/*.{md,mdx,py,sh,yaml,yml,toml,json}

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

Keep package names, repo references, and build commands current

Files:

  • docs/observability-plugin/atof.mdx
**/*.mdx

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

In MDX files, top-of-file comments must use JSX comment delimiters: {/* to open and */} to close. Do not use HTML comments for MDX SPDX headers.

MDX top-of-file SPDX comments must use {/* ... */} delimiters instead of HTML comment delimiters (Must-Fix)

Files:

  • docs/observability-plugin/atof.mdx
**/*.{html,md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Include SPDX license header in HTML and Markdown files using HTML comment syntax

Files:

  • docs/observability-plugin/atof.mdx
docs/**/*.{md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Update embedded documentation snippets, patch docs, and binding-support notes if examples or supported bindings changed

Files:

  • docs/observability-plugin/atof.mdx
docs/**

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Run just docs or ./scripts/build-docs.sh html to regenerate ignored Fern API reference pages before validation for documentation site changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}

⚙️ CodeRabbit configuration file

{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}: Review documentation for technical accuracy against the current API, command correctness, and consistency across language bindings.
Flag stale examples, missing SPDX headers where required, and instructions that no longer match CI or pre-commit behavior.

Files:

  • docs/observability-plugin/atof.mdx
**/*.rs

📄 CodeRabbit inference engine (.agents/skills/add-binding-feature/SKILL.md)

Use snake_case naming convention for Rust identifiers (e.g., nemo_relay_tool_call)

**/*.rs: Any Rust change must run just test-rust
Any Rust change must run cargo fmt --all
Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all for all FFI work since it is Rust work
Run just test-rust to validate FFI changes
Run cargo clippy --workspace --all-targets -- -D warnings to enforce strict linting on FFI work

When Rust files changed as part of Go work, also run cargo fmt --all, just test-rust, and cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all when Rust files are changed as part of Node work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files are changed as part of Node work
Run just test-rust when Rust files are changed as part of Node work

**/*.rs: Run cargo fmt --all to format all Rust code
Run cargo clippy --workspace --all-targets -- -D warnings to enforce all clippy lints as errors

**/*.rs: Run cargo fmt --all when Rust files changed as part of WebAssembly work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files changed as part of WebAssembly work

**/*.rs: If any Rust code changed, always run just test-rust
If any Rust code changed, also run cargo fmt --all
If any Rust code changed, also run cargo clippy --workspace --all-targets -- -D warnings
Run Rust formatting with cargo fmt --all
Run Rust linting with cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Keep SPDX headers on Rust source files. The project is Apache-2.0.
Use snake_case for Rust binding naming conventions.
Use Json = serde_json::Value in Rust-facing runtime APIs where the existing code expects JSON payloads.
Use Result<T> with FlowError in core runtime paths. Keep errors explicit and binding-appropriate at the wrapper layer.
Preserve async behavior on the existing tokio-based model i...

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/adaptive/**/*.rs,**/*test*.{rs,py,go,ts,js},**/*adaptive*test*.{rs,py,go,ts,js},docs/plugins/adaptive/**}

📄 CodeRabbit inference engine (.agents/skills/maintain-optimizer/SKILL.md)

Maintain documented and tested validation and report behavior for adaptive surfaces

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
**/{Cargo.toml,**/*.rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Maintain consistency between Rust package names in Cargo.toml and their actual usage across the codebase

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{h,hpp,c,cpp,rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Ensure FFI header and library naming follows consistent conventions across platform-specific builds

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/core,crates/adaptive}/**/*

📄 CodeRabbit inference engine (.agents/skills/prepare-pr/SKILL.md)

Changes to crates/core or crates/adaptive must run the full language matrix

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{rs,toml}

📄 CodeRabbit inference engine (.agents/skills/rename-surfaces/SKILL.md)

Update Rust crate names and module prefixes during coordinated rename operations

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/core/**/*.rs

📄 CodeRabbit inference engine (.agents/skills/test-go-binding/SKILL.md)

If the change touched crates/core or shared runtime semantics, also use validate-change for broader validation

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**/*.rs

⚙️ CodeRabbit configuration file

crates/{core,adaptive}/**/*.rs: Review the Rust runtime for async correctness, scope isolation, middleware ordering, and event lifecycle regressions.
Pay close attention to task-local/thread-local scope propagation, callback lifetimes, stream finalization, and root_uuid isolation.
Public API changes should preserve existing behavior unless tests and docs show the intended migration path.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}

⚙️ CodeRabbit configuration file

{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}: Tests should cover the behavior promised by the changed API surface, including error paths and cross-request isolation where relevant.
Prefer assertions on lifecycle events, scope stacks, middleware ordering, and binding parity over shallow smoke tests.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
🔇 Additional comments (4)
crates/core/src/observability/atof.rs (2)

59-66: FlushStream still appears unused.

This concern was already raised earlier and still appears applicable in the current code snapshot.


255-518: Required Rust/core validation matrix should be recorded before merge.

This concern was already raised earlier and still appears applicable for this crates/core Rust change set. As per coding guidelines "Any Rust change must run just test-rust", "Any Rust change must run cargo fmt --all", "Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings", and "crates/{core,adaptive}/**: If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly".

crates/core/tests/unit/observability/atof_tests.rs (1)

25-26: LGTM!

Also applies to: 121-153, 262-356

docs/observability-plugin/atof.mdx (1)

233-279: LGTM!

Comment thread crates/core/src/observability/atof.rs Outdated
@OmkarMehta OmkarMehta force-pushed the feat/streaming-atof-exporter branch 2 times, most recently from 90261bc to 6709d6b Compare May 28, 2026 19:06
@OmkarMehta
Copy link
Copy Markdown
Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/core/src/observability/atof.rs`:
- Around line 443-523: force_flush currently only checks state.last_error before
enqueueing AtofStreamMessage::Flush and returns success if the Flush reply is
Ok, so any prior write failure stored in state.last_error is ignored; update
force_flush to re-lock state and inspect stream_last_error(&state.last_error)
after receiving the Flush reply (and return StoredStreamFailure if set). For
shutdown, avoid the race by taking/clearing state.sender (and
state.writer_thread if desired) before sending the Flush/Shutdown control
message or by sending the control message and then re-locking to re-check
state.last_error after the round-trip; specifically modify shutdown to call
state.sender.take() before issuing the flush barrier or to re-check
state.last_error after reply so any queued Event failures are propagated in the
returned Result. Ensure references: force_flush, shutdown, state.last_error,
state.sender (state.sender.take()), AtofStreamMessage::Flush and
AtofStreamMessage::Shutdown.

In `@crates/core/tests/unit/observability/atof_tests.rs`:
- Around line 338-382: The test never verifies the documented “receiver sees
only future events” behavior because all events are emitted after
exporter.register(); to fix, emit a trivial scope/event/pop (using
crate::api::scope::push_scope, event, pop_scope with unique names) before
calling AtofStreamingExporter::connect/register and assert the receiver does NOT
receive it (e.g., recv_timeout returns Err or returns zero events), then perform
exporter.register(&name) and emit the three events as now and assert the
receiver sees only those three; reference start_atof_socket_sink,
AtofStreamingExporter::connect, exporter.register, receiver.recv_timeout and the
push_scope/event/pop_scope calls to locate where to insert the pre-registration
emission and the corresponding negative assertion.

In `@docs/observability-plugin/atof.mdx`:
- Around line 267-279: Add the missing serde_json imports at the top of the
example so the symbols used in the receiver (serde_json::Value,
serde_json::from_str, serde_json::to_string) resolve; specifically import the
required items (for example use serde_json::{Value, from_str, to_string};) so
the for-loop parsing and printing (the Value type and from_str/to_string
functions referenced in the snippet) compile.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: 4d1989f2-110a-4205-bcaa-88dbe707f110

📥 Commits

Reviewing files that changed from the base of the PR and between df40fcf and 6709d6b.

📒 Files selected for processing (3)
  • crates/core/src/observability/atof.rs
  • crates/core/tests/unit/observability/atof_tests.rs
  • docs/observability-plugin/atof.mdx
📜 Review details
🧰 Additional context used
📓 Path-based instructions (21)
{docs/**,README.md,CONTRIBUTING.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

{docs/**,README.md,CONTRIBUTING.md}: For docs-only changes, run targeted checks only if commands, package names, or examples changed. Use just docs for docs-site builds and just docs-linkcheck when links changed
Run docs site build with just docs

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Run docs link validation with just docs-linkcheck when links change

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify README and docs entry points still match current package names and paths for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,examples/**,README.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Verify examples still run with documented commands for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,**/Cargo.toml,**/package.json,**/*.md}

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

Ensure renamed public surfaces are reflected consistently in manifests and docs for large or public-facing changes

Files:

  • docs/observability-plugin/atof.mdx
**/*.{md,mdx,py,sh,yaml,yml,toml,json}

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

Keep package names, repo references, and build commands current

Files:

  • docs/observability-plugin/atof.mdx
**/*.mdx

📄 CodeRabbit inference engine (.agents/skills/contribute-docs/SKILL.md)

In MDX files, top-of-file comments must use JSX comment delimiters: {/* to open and */} to close. Do not use HTML comments for MDX SPDX headers.

MDX top-of-file SPDX comments must use {/* ... */} delimiters instead of HTML comment delimiters (Must-Fix)

Files:

  • docs/observability-plugin/atof.mdx
**/*.{html,md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Include SPDX license header in HTML and Markdown files using HTML comment syntax

Files:

  • docs/observability-plugin/atof.mdx
docs/**/*.{md,mdx}

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Update embedded documentation snippets, patch docs, and binding-support notes if examples or supported bindings changed

Files:

  • docs/observability-plugin/atof.mdx
docs/**

📄 CodeRabbit inference engine (CONTRIBUTING.md)

Run just docs or ./scripts/build-docs.sh html to regenerate ignored Fern API reference pages before validation for documentation site changes

Files:

  • docs/observability-plugin/atof.mdx
{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}

⚙️ CodeRabbit configuration file

{docs/**,README.md,CONTRIBUTING.md,RELEASING.md,SECURITY.md}: Review documentation for technical accuracy against the current API, command correctness, and consistency across language bindings.
Flag stale examples, missing SPDX headers where required, and instructions that no longer match CI or pre-commit behavior.

Files:

  • docs/observability-plugin/atof.mdx
**/*.rs

📄 CodeRabbit inference engine (.agents/skills/add-binding-feature/SKILL.md)

Use snake_case naming convention for Rust identifiers (e.g., nemo_relay_tool_call)

**/*.rs: Any Rust change must run just test-rust
Any Rust change must run cargo fmt --all
Any Rust change must run cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all for all FFI work since it is Rust work
Run just test-rust to validate FFI changes
Run cargo clippy --workspace --all-targets -- -D warnings to enforce strict linting on FFI work

When Rust files changed as part of Go work, also run cargo fmt --all, just test-rust, and cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Run cargo fmt --all when Rust files are changed as part of Node work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files are changed as part of Node work
Run just test-rust when Rust files are changed as part of Node work

**/*.rs: Run cargo fmt --all to format all Rust code
Run cargo clippy --workspace --all-targets -- -D warnings to enforce all clippy lints as errors

**/*.rs: Run cargo fmt --all when Rust files changed as part of WebAssembly work
Run cargo clippy --workspace --all-targets -- -D warnings when Rust files changed as part of WebAssembly work

**/*.rs: If any Rust code changed, always run just test-rust
If any Rust code changed, also run cargo fmt --all
If any Rust code changed, also run cargo clippy --workspace --all-targets -- -D warnings
Run Rust formatting with cargo fmt --all
Run Rust linting with cargo clippy --workspace --all-targets -- -D warnings

**/*.rs: Keep SPDX headers on Rust source files. The project is Apache-2.0.
Use snake_case for Rust binding naming conventions.
Use Json = serde_json::Value in Rust-facing runtime APIs where the existing code expects JSON payloads.
Use Result<T> with FlowError in core runtime paths. Keep errors explicit and binding-appropriate at the wrapper layer.
Preserve async behavior on the existing tokio-based model i...

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/adaptive/**/*.rs,**/*test*.{rs,py,go,ts,js},**/*adaptive*test*.{rs,py,go,ts,js},docs/plugins/adaptive/**}

📄 CodeRabbit inference engine (.agents/skills/maintain-optimizer/SKILL.md)

Maintain documented and tested validation and report behavior for adaptive surfaces

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
**/{Cargo.toml,**/*.rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Maintain consistency between Rust package names in Cargo.toml and their actual usage across the codebase

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{h,hpp,c,cpp,rs}

📄 CodeRabbit inference engine (.agents/skills/maintain-packaging/SKILL.md)

Ensure FFI header and library naming follows consistent conventions across platform-specific builds

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/core,crates/adaptive}/**/*

📄 CodeRabbit inference engine (.agents/skills/prepare-pr/SKILL.md)

Changes to crates/core or crates/adaptive must run the full language matrix

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
**/*.{rs,toml}

📄 CodeRabbit inference engine (.agents/skills/rename-surfaces/SKILL.md)

Update Rust crate names and module prefixes during coordinated rename operations

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/core/**/*.rs

📄 CodeRabbit inference engine (.agents/skills/test-go-binding/SKILL.md)

If the change touched crates/core or shared runtime semantics, also use validate-change for broader validation

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**

📄 CodeRabbit inference engine (.agents/skills/validate-change/SKILL.md)

If crates/core or crates/adaptive changed, run the full matrix across Rust, Python, Go, Node.js, and WebAssembly

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
crates/{core,adaptive}/**/*.rs

⚙️ CodeRabbit configuration file

crates/{core,adaptive}/**/*.rs: Review the Rust runtime for async correctness, scope isolation, middleware ordering, and event lifecycle regressions.
Pay close attention to task-local/thread-local scope propagation, callback lifetimes, stream finalization, and root_uuid isolation.
Public API changes should preserve existing behavior unless tests and docs show the intended migration path.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
  • crates/core/src/observability/atof.rs
{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}

⚙️ CodeRabbit configuration file

{crates/**/tests/**,python/tests/**,go/nemo_relay/**/*_test.go}: Tests should cover the behavior promised by the changed API surface, including error paths and cross-request isolation where relevant.
Prefer assertions on lifecycle events, scope stacks, middleware ordering, and binding parity over shallow smoke tests.

Files:

  • crates/core/tests/unit/observability/atof_tests.rs
🔇 Additional comments (3)
docs/observability-plugin/atof.mdx (2)

233-241: LGTM!


242-263: LGTM!

crates/core/tests/unit/observability/atof_tests.rs (1)

16-25: LGTM!

Also applies to: 121-153, 262-332

Comment thread crates/core/src/observability/atof.rs
Comment thread crates/core/tests/unit/observability/atof_tests.rs
Comment thread docs/observability-plugin/atof.mdx
Signed-off-by: Omkar Mehta <omehta@nvidia.com>
@OmkarMehta OmkarMehta force-pushed the feat/streaming-atof-exporter branch from 6709d6b to 890df17 Compare May 28, 2026 19:24
@OmkarMehta
Copy link
Copy Markdown
Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

Actionable comments posted: 0

@OmkarMehta
Copy link
Copy Markdown
Author

Addressed in the current head (890df17).

  • AtofStreamingExporter now connects to a separate local TCP receiver with TcpStream::connect, so the receiver can run as an independent process.
  • Events are serialized as canonical ATOF JSONL and sent over the socket in real time.
  • Socket writes are moved off the subscriber callback through a bounded queue plus a dedicated writer thread.
  • The commit is signed off: Signed-off-by: Omkar Mehta <omehta@nvidia.com>.

Fresh focused validation: cargo test -p nemo-relay streaming_exporter -- --test-threads=1 -> 6 passed, 541 filtered out.

@willkill07 willkill07 changed the title Add streaming ATOF exporter feat: add streaming ATOF exporter May 28, 2026
@github-actions github-actions Bot added the Feature a new feature label May 28, 2026
@willkill07
Copy link
Copy Markdown
Member

  1. I need you to dismiss/resolve any outdated coderabbit reviews. Force-pushing makes it impossible for coderabbit to automatically resolve outdated suggestions.
  2. The PR should be up-to-date with main
  3. I was first hoping for a design document rather than direct-to-solution. Please give me time to review.

@willkill07 willkill07 added this to the 0.4 milestone May 28, 2026
@omkarmehta06
Copy link
Copy Markdown

Closing this in favor of #173, which carries the same streaming ATOF exporter work from the GitHub account associated with omehta@nvidia.com. Please review #173 going forward.

@omkarmehta06
Copy link
Copy Markdown

Superseded by #173 from the correct GitHub account.

@OmkarMehta OmkarMehta closed this May 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Feature a new feature lang:rust PR changes/introduces Rust code size:L PR is large

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants