From 232c587de2fe71ad837496907dfd85fd6d6c8c72 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 10:52:23 -0700 Subject: [PATCH 1/9] Add streaming ATOF exporter Signed-off-by: Omkar Mehta --- crates/core/src/observability/atof.rs | 355 +++++++++++++++++- .../tests/unit/observability/atof_tests.rs | 198 ++++++++++ docs/observability-plugin/atof.mdx | 49 +++ 3 files changed, 599 insertions(+), 3 deletions(-) diff --git a/crates/core/src/observability/atof.rs b/crates/core/src/observability/atof.rs index d1bca1d2..c574cd3f 100644 --- a/crates/core/src/observability/atof.rs +++ b/crates/core/src/observability/atof.rs @@ -10,8 +10,11 @@ use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; +use std::net::{Shutdown, TcpStream}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread::JoinHandle; +use std::time::Duration; use chrono::Utc; @@ -45,6 +48,28 @@ pub enum AtofExporterError { /// Underlying I/O error. source: std::io::Error, }, + /// Failed to connect to an ATOF stream receiver. + #[error("failed to connect to ATOF stream receiver {address}: {source}")] + ConnectStream { + /// Address that failed to connect. + address: String, + /// Underlying I/O error. + source: std::io::Error, + }, + /// Failed to configure the ATOF stream connection. + #[error( + "failed to configure ATOF stream receiver {address} with {operation} (ATOF_STREAM_WRITE_TIMEOUT={timeout:?}): {source}" + )] + ConfigureStream { + /// Address associated with the stream. + address: String, + /// Stream option that failed. + operation: &'static str, + /// Write timeout used when configuring the stream. + timeout: Option, + /// Underlying I/O error. + source: std::io::Error, + }, /// The exporter recorded an earlier write or serialization error. #[error("previous ATOF export failed for {path:?}: {message}")] StoredFailure { @@ -53,6 +78,14 @@ pub enum AtofExporterError { /// Stored failure message. message: String, }, + /// The streaming exporter recorded an earlier write or serialization error. + #[error("previous ATOF stream export failed for {address}: {message}")] + StoredStreamFailure { + /// Address associated with the stream. + address: String, + /// Stored failure message. + message: String, + }, /// The internal exporter state lock was poisoned. #[error("the ATOF exporter state lock was poisoned")] LockPoisoned, @@ -225,6 +258,302 @@ impl AtofExporter { } } +/// Configuration for [`AtofStreamingExporter`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AtofStreamingExporterConfig { + /// TCP address for a separate local process that receives ATOF JSONL events. + pub address: String, +} + +impl AtofStreamingExporterConfig { + /// Create a streaming exporter config for the given TCP address. + pub fn new(address: impl Into) -> Self { + Self { + address: address.into(), + } + } +} + +const ATOF_STREAM_QUEUE_BOUND: usize = 1024; +const ATOF_STREAM_WRITE_TIMEOUT: Duration = Duration::from_secs(2); + +enum AtofStreamMessage { + Event(String), + Flush(mpsc::Sender>), + Shutdown(mpsc::Sender>), +} + +struct AtofStreamingExporterState { + sender: Option>, + writer_thread: Option>, + events_sent: u64, + last_error: Arc>>, +} + +/// Snapshot of [`AtofStreamingExporter`] delivery state. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct AtofStreamingExporterStats { + /// Number of ATOF events observed by the streaming exporter. + pub events_sent: u64, + /// Most recent serialization or exporter state error, if one was recorded. + pub last_error: Option, +} + +/// TCP-backed Agent Trajectory Observability Format (ATOF) event stream exporter. +/// +/// The exporter exposes a regular NeMo Relay event subscriber and writes each +/// canonical ATOF JSON value as one JSONL line to a separate local process over +/// a TCP connection. A local UI, CLI, or bridge process can own the receiving +/// socket and fan events out over HTTP, SSE, WebSocket, stdout, or another +/// transport without redefining the ATOF event contract. +#[derive(Clone)] +pub struct AtofStreamingExporter { + address: String, + state: Arc>, +} + +impl AtofStreamingExporter { + /// Connect to a separate local ATOF stream receiver. + pub fn new(config: AtofStreamingExporterConfig) -> Result { + let address = config.address; + let stream = + TcpStream::connect(&address).map_err(|source| AtofExporterError::ConnectStream { + address: address.clone(), + source, + })?; + stream + .set_nodelay(true) + .map_err(|source| AtofExporterError::ConfigureStream { + address: address.clone(), + operation: "set_nodelay", + timeout: None, + source, + })?; + stream + .set_write_timeout(Some(ATOF_STREAM_WRITE_TIMEOUT)) + .map_err(|source| AtofExporterError::ConfigureStream { + address: address.clone(), + operation: "set_write_timeout", + timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), + source, + })?; + let (sender, receiver) = mpsc::sync_channel(ATOF_STREAM_QUEUE_BOUND); + let last_error = Arc::new(Mutex::new(None)); + let writer_error = Arc::clone(&last_error); + let writer_thread = std::thread::spawn(move || { + let mut writer = BufWriter::new(stream); + while let Ok(message) = receiver.recv() { + match message { + AtofStreamMessage::Event(value) => { + if let Err(error) = write_serialized_event(&mut writer, &value) { + store_stream_error(&writer_error, error); + } + } + AtofStreamMessage::Flush(reply) => { + let result = writer.flush().map_err(|error| error.to_string()); + if let Err(error) = &result { + store_stream_error(&writer_error, error.clone()); + } + let _ = reply.send(result); + } + AtofStreamMessage::Shutdown(reply) => { + let result = writer.flush().map_err(|error| error.to_string()); + if let Err(error) = &result { + store_stream_error(&writer_error, error.clone()); + } + let _ = writer.get_ref().shutdown(Shutdown::Both); + let _ = reply.send(result); + break; + } + } + } + }); + Ok(Self { + address, + state: Arc::new(Mutex::new(AtofStreamingExporterState { + sender: Some(sender), + writer_thread: Some(writer_thread), + events_sent: 0, + last_error, + })), + }) + } + + /// Connect to a separate local ATOF stream receiver at the given TCP address. + pub fn connect(address: impl Into) -> Result { + Self::new(AtofStreamingExporterConfig::new(address)) + } + + /// Return the connected stream receiver address. + pub fn address(&self) -> &str { + &self.address + } + + /// Return an event subscriber that writes one canonical JSONL record per event. + pub fn subscriber(&self) -> EventSubscriberFn { + let state = Arc::clone(&self.state); + Arc::new(move |event: &Event| { + let value = match serialize_event(event) { + Ok(value) => value, + Err(error) => { + if let Ok(state) = state.lock() { + store_stream_error(&state.last_error, error); + } + return; + } + }; + let Ok(mut state) = state.lock() else { + return; + }; + if stream_last_error(&state.last_error).is_some() { + return; + } + let Some(sender) = state.sender.as_ref() else { + store_stream_error(&state.last_error, "stream receiver is closed".to_string()); + return; + }; + match sender.try_send(AtofStreamMessage::Event(value)) { + Ok(()) => { + state.events_sent += 1; + } + Err(mpsc::TrySendError::Full(_)) => { + store_stream_error(&state.last_error, "ATOF stream queue is full".to_string()); + } + Err(mpsc::TrySendError::Disconnected(_)) => { + store_stream_error( + &state.last_error, + "ATOF stream writer is disconnected".to_string(), + ); + } + } + }) + } + + /// Register this streaming exporter globally under the given subscriber name. + pub fn register(&self, name: &str) -> Result<()> { + register_subscriber(name, self.subscriber()).map_err(Into::into) + } + + /// Deregister a global subscriber by name. + pub fn deregister(&self, name: &str) -> Result { + deregister_subscriber(name).map_err(Into::into) + } + + /// Flush the stream and report any stored write error. + pub fn force_flush(&self) -> Result<()> { + let (sender, last_error) = { + let state = self + .state + .lock() + .map_err(|_| AtofExporterError::LockPoisoned)?; + if let Some(message) = stream_last_error(&state.last_error) { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + } + (state.sender.clone(), Arc::clone(&state.last_error)) + }; + let Some(sender) = sender else { + return Ok(()); + }; + let (reply_sender, reply_receiver) = mpsc::channel(); + if sender.send(AtofStreamMessage::Flush(reply_sender)).is_err() { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: "ATOF stream writer is disconnected".to_string(), + }); + } + match reply_receiver.recv() { + Ok(Ok(())) => { + if let Some(message) = stream_last_error(&last_error) { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + } + Ok(()) + } + Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }), + Err(error) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: error.to_string(), + }), + } + } + + /// Shut down the stream by flushing and closing the TCP connection. + pub fn shutdown(&self) -> Result<()> { + let flush_result = self.force_flush(); + let (sender, writer_thread, last_error) = { + let mut state = self + .state + .lock() + .map_err(|_| AtofExporterError::LockPoisoned)?; + ( + state.sender.take(), + state.writer_thread.take(), + Arc::clone(&state.last_error), + ) + }; + let shutdown_result = if let Some(sender) = sender { + let (reply_sender, reply_receiver) = mpsc::channel(); + let send_result = sender + .send(AtofStreamMessage::Shutdown(reply_sender)) + .map_err(|_| AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: "ATOF stream writer is disconnected".to_string(), + }); + match send_result { + Ok(()) => match reply_receiver.recv() { + Ok(Ok(())) => Ok(()), + Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }), + Err(error) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: error.to_string(), + }), + }, + Err(error) => Err(error), + } + } else { + Ok(()) + }; + if let Some(writer_thread) = writer_thread { + let _ = writer_thread.join(); + } + let stored_result = + stream_last_error(&last_error).map(|message| AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + match (flush_result, shutdown_result) { + (Err(error), _) => Err(error), + (Ok(()), Err(error)) => Err(error), + (Ok(()), Ok(())) => stored_result.map_or(Ok(()), Err), + } + } + + /// Return a point-in-time delivery snapshot for diagnostics and tests. + pub fn stats(&self) -> AtofStreamingExporterStats { + let Ok(state) = self.state.lock() else { + return AtofStreamingExporterStats { + last_error: Some("the ATOF streaming exporter state lock was poisoned".to_string()), + ..AtofStreamingExporterStats::default() + }; + }; + AtofStreamingExporterStats { + events_sent: state.events_sent, + last_error: stream_last_error(&state.last_error), + } + } +} + fn default_filename() -> String { format!( "nemo-relay-events-{}.jsonl", @@ -251,15 +580,35 @@ fn open_file(path: &Path, mode: AtofExporterMode) -> Result { }) } -fn write_event(writer: &mut BufWriter, event: &Event) -> std::result::Result<(), String> { +fn write_event(writer: &mut impl Write, event: &Event) -> std::result::Result<(), String> { + write_serialized_event(writer, &serialize_event(event)?) +} + +fn serialize_event(event: &Event) -> std::result::Result { let value = event .try_to_json_value() .map_err(|error| error.to_string())?; - serde_json::to_writer(&mut *writer, &value).map_err(|error| error.to_string())?; + serde_json::to_string(&value).map_err(|error| error.to_string()) +} + +fn write_serialized_event(writer: &mut impl Write, value: &str) -> std::result::Result<(), String> { + writer + .write_all(value.as_bytes()) + .map_err(|error| error.to_string())?; writer.write_all(b"\n").map_err(|error| error.to_string())?; writer.flush().map_err(|error| error.to_string()) } +fn store_stream_error(last_error: &Arc>>, error: String) { + if let Ok(mut last_error) = last_error.lock() { + last_error.get_or_insert(error); + } +} + +fn stream_last_error(last_error: &Arc>>) -> Option { + last_error.lock().ok().and_then(|error| error.clone()) +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 51fe6af2..669d4bd3 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -13,10 +13,17 @@ use crate::api::scope::{EmitMarkEventParams, PopScopeParams, PushScopeParams, Sc use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent}; use serde_json::{Map, json}; use std::fs; +use std::io::{BufRead, BufReader, Read}; +use std::net::TcpListener; use std::sync::Arc; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; +const TEST_RECV_TIMEOUT: Duration = Duration::from_secs(2); + fn temp_dir(prefix: &str) -> PathBuf { let id = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -111,6 +118,40 @@ fn read_jsonl(path: &Path) -> Vec { .collect() } +fn start_atof_socket_sink( + expected_events: usize, +) -> (String, mpsc::Receiver>) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let (stream, _) = listener.accept().unwrap(); + let mut reader = BufReader::new(stream); + let mut events = Vec::with_capacity(expected_events); + for _ in 0..expected_events { + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + events.push(serde_json::from_str(line.trim_end()).unwrap()); + } + sender.send(events).unwrap(); + }); + (address, receiver) +} + +fn start_atof_eof_sink() -> (String, mpsc::Receiver<()>) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let (stream, _) = listener.accept().unwrap(); + let mut reader = BufReader::new(stream); + let mut buffer = String::new(); + reader.read_to_string(&mut buffer).unwrap(); + sender.send(()).unwrap(); + }); + (address, receiver) +} + #[test] fn default_config_uses_cwd_append_and_timestamped_filename() { let config = AtofExporterConfig::default(); @@ -218,6 +259,163 @@ fn subscriber_writes_canonical_event_jsonl() { ); } +#[test] +fn streaming_exporter_writes_canonical_event_json_values_to_socket() { + let (address, receiver) = start_atof_socket_sink(1); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + let event = make_annotated_llm_event("streamed-llm-start"); + + (exporter.subscriber())(&event); + exporter.shutdown().unwrap(); + + let delivered = receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + assert_eq!(delivered[0], event.try_to_json_value().unwrap()); + assert_eq!(exporter.stats().events_sent, 1); +} + +#[test] +fn streaming_exporter_reports_connection_failure() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + drop(listener); + + let error = match AtofStreamingExporter::connect(address) { + Ok(_) => panic!("expected streaming exporter connection to fail"), + Err(error) => error, + }; + + assert!(matches!(error, AtofExporterError::ConnectStream { .. })); +} + +#[test] +fn streaming_exporter_configuration_error_names_socket_options() { + let error = AtofExporterError::ConfigureStream { + address: "127.0.0.1:65535".to_string(), + operation: "set_write_timeout", + timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), + source: std::io::Error::other("timeout option rejected"), + } + .to_string(); + + assert!(error.contains("set_write_timeout")); + assert!(error.contains("ATOF_STREAM_WRITE_TIMEOUT")); +} + +#[test] +fn streaming_exporter_shutdown_closes_stream_after_stored_error() { + let (address, receiver) = start_atof_eof_sink(); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + + let last_error = Arc::clone(&exporter.state.lock().unwrap().last_error); + *last_error.lock().unwrap() = Some("forced failure".to_string()); + let error = exporter.shutdown().unwrap_err(); + + assert!(matches!( + error, + AtofExporterError::StoredStreamFailure { .. } + )); + receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); +} + +#[test] +fn streaming_exporter_preserves_first_stored_error() { + let last_error = Arc::new(Mutex::new(None)); + + store_stream_error(&last_error, "first failure".to_string()); + store_stream_error(&last_error, "later failure".to_string()); + + assert_eq!( + stream_last_error(&last_error), + Some("first failure".to_string()) + ); +} + +#[test] +fn streaming_exporter_registers_with_runtime_events() { + let _guard = crate::observability::test_mutex().lock().unwrap(); + reset_global(); + + let pre_handle = crate::api::scope::push_scope( + PushScopeParams::builder() + .name("pre_atof_streaming_scope") + .scope_type(ScopeType::Agent) + .input(json!({"before": true})) + .build(), + ) + .unwrap(); + crate::api::scope::event( + EmitMarkEventParams::builder() + .name("pre_atof_streaming_mark") + .parent(&pre_handle) + .data(json!({"before": true})) + .build(), + ) + .unwrap(); + crate::api::scope::pop_scope( + PopScopeParams::builder() + .handle_uuid(&pre_handle.uuid) + .output(json!({"before": true})) + .build(), + ) + .unwrap(); + + let (address, receiver) = start_atof_socket_sink(3); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + let name = format!("atof_streaming_exporter_{}", Uuid::now_v7()); + + exporter.register(&name).unwrap(); + let handle = crate::api::scope::push_scope( + PushScopeParams::builder() + .name("atof_streaming_scope") + .scope_type(ScopeType::Agent) + .input(json!({"scope": true})) + .build(), + ) + .unwrap(); + crate::api::scope::event( + EmitMarkEventParams::builder() + .name("atof_streaming_mark") + .parent(&handle) + .data(json!({"mark": true})) + .build(), + ) + .unwrap(); + crate::api::scope::pop_scope( + PopScopeParams::builder() + .handle_uuid(&handle.uuid) + .output(json!({"done": true})) + .build(), + ) + .unwrap(); + + assert!(exporter.deregister(&name).unwrap()); + assert!(!exporter.deregister(&name).unwrap()); + exporter.shutdown().unwrap(); + + let events = receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + let scope_start = &events[0]; + let mark = &events[1]; + let scope_end = &events[2]; + + assert_eq!(scope_start["name"], "atof_streaming_scope"); + assert_eq!(scope_start["scope_category"], "start"); + assert_eq!(mark["name"], "atof_streaming_mark"); + assert_eq!(mark["kind"], "mark"); + assert_eq!(scope_end["name"], "atof_streaming_scope"); + assert_eq!(scope_end["scope_category"], "end"); + assert!( + events + .iter() + .all(|event| event["name"] != "pre_atof_streaming_scope") + ); + assert!( + events + .iter() + .all(|event| event["name"] != "pre_atof_streaming_mark") + ); + assert_eq!(exporter.stats().events_sent, 3); +} + #[test] fn register_deregister_flush_and_shutdown_work_with_runtime_events() { let _guard = crate::observability::test_mutex().lock().unwrap(); diff --git a/docs/observability-plugin/atof.mdx b/docs/observability-plugin/atof.mdx index df705473..9cae56af 100644 --- a/docs/observability-plugin/atof.mdx +++ b/docs/observability-plugin/atof.mdx @@ -230,6 +230,55 @@ exporter.shutdown()?; +## Streaming API + +Use `AtofStreamingExporter` when an application needs to consume canonical ATOF +events as they are emitted instead of reading a completed JSONL file. The +streaming exporter connects to a separate local process over TCP and writes one +canonical ATOF JSON object per JSONL line. The receiver process can then fan the +events out through HTTP, SSE, WebSocket, stdout, or another local transport +without changing the ATOF event contract. + +```rust +use nemo_relay::observability::atof::{ + AtofStreamingExporter, AtofStreamingExporterConfig, +}; + +// Start a separate local receiver process before connecting the exporter. +let config = AtofStreamingExporterConfig::new("127.0.0.1:43199"); +let exporter = AtofStreamingExporter::new(config)?; +exporter.register("atof-stream")?; + +// Run instrumented application work here. + +let _ = exporter.deregister("atof-stream")?; +exporter.shutdown()?; +let stats = exporter.stats(); +assert!(stats.events_sent > 0); +``` + +Register the streaming exporter before the instrumented work starts. The +receiver process only observes future events and does not replay earlier +lifecycle events. + +A minimal receiver can block on TCP lines and exit when the exporter closes the +connection. This keeps short idle gaps from ending the stream: + +```rust +use serde_json::{from_str, to_string, Value}; +use std::io::{BufRead, BufReader}; +use std::net::TcpListener; + +let listener = TcpListener::bind("127.0.0.1:43199")?; +let (stream, _) = listener.accept()?; +let reader = BufReader::new(stream); + +for line in reader.lines() { + let event: Value = from_str(&line?)?; + println!("{}", to_string(&event)?); +} +``` + ## Common Validation Failures - `mode` is not `append` or `overwrite`. From e0835e5c9e8d8635f52b927de3620d3570604f38 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 14:42:49 -0700 Subject: [PATCH 2/9] Fix streaming ATOF backpressure handling Signed-off-by: Omkar Mehta --- crates/core/src/observability/atof.rs | 7 +- .../tests/unit/observability/atof_tests.rs | 89 ++++++++++++++++--- docs/observability-plugin/atof.mdx | 5 +- 3 files changed, 89 insertions(+), 12 deletions(-) diff --git a/crates/core/src/observability/atof.rs b/crates/core/src/observability/atof.rs index c574cd3f..17548293 100644 --- a/crates/core/src/observability/atof.rs +++ b/crates/core/src/observability/atof.rs @@ -287,6 +287,7 @@ struct AtofStreamingExporterState { sender: Option>, writer_thread: Option>, events_sent: u64, + events_dropped: u64, last_error: Arc>>, } @@ -295,6 +296,8 @@ struct AtofStreamingExporterState { pub struct AtofStreamingExporterStats { /// Number of ATOF events observed by the streaming exporter. pub events_sent: u64, + /// Number of ATOF events dropped because the bounded streaming queue was full. + pub events_dropped: u64, /// Most recent serialization or exporter state error, if one was recorded. pub last_error: Option, } @@ -374,6 +377,7 @@ impl AtofStreamingExporter { sender: Some(sender), writer_thread: Some(writer_thread), events_sent: 0, + events_dropped: 0, last_error, })), }) @@ -417,7 +421,7 @@ impl AtofStreamingExporter { state.events_sent += 1; } Err(mpsc::TrySendError::Full(_)) => { - store_stream_error(&state.last_error, "ATOF stream queue is full".to_string()); + state.events_dropped += 1; } Err(mpsc::TrySendError::Disconnected(_)) => { store_stream_error( @@ -549,6 +553,7 @@ impl AtofStreamingExporter { }; AtofStreamingExporterStats { events_sent: state.events_sent, + events_dropped: state.events_dropped, last_error: stream_last_error(&state.last_error), } } diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 669d4bd3..30217c11 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -18,7 +18,7 @@ use std::net::TcpListener; use std::sync::Arc; use std::sync::mpsc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; @@ -122,32 +122,70 @@ fn start_atof_socket_sink( expected_events: usize, ) -> (String, mpsc::Receiver>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); let address = listener.local_addr().unwrap().to_string(); let (sender, receiver) = mpsc::channel(); thread::spawn(move || { - let (stream, _) = listener.accept().unwrap(); + let deadline = Instant::now() + TEST_RECV_TIMEOUT; + let stream = loop { + match listener.accept() { + Ok((stream, _)) => break stream, + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + if Instant::now() >= deadline { + let _ = sender.send(Vec::new()); + return; + } + thread::sleep(Duration::from_millis(10)); + } + Err(_) => { + let _ = sender.send(Vec::new()); + return; + } + } + }; + stream.set_read_timeout(Some(TEST_RECV_TIMEOUT)).unwrap(); let mut reader = BufReader::new(stream); let mut events = Vec::with_capacity(expected_events); for _ in 0..expected_events { let mut line = String::new(); - reader.read_line(&mut line).unwrap(); - events.push(serde_json::from_str(line.trim_end()).unwrap()); + match reader.read_line(&mut line) { + Ok(0) | Err(_) => break, + Ok(_) => events.push(serde_json::from_str(line.trim_end()).unwrap()), + } } - sender.send(events).unwrap(); + let _ = sender.send(events); }); (address, receiver) } -fn start_atof_eof_sink() -> (String, mpsc::Receiver<()>) { +fn start_atof_eof_sink() -> (String, mpsc::Receiver) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); let address = listener.local_addr().unwrap().to_string(); let (sender, receiver) = mpsc::channel(); thread::spawn(move || { - let (stream, _) = listener.accept().unwrap(); + let deadline = Instant::now() + TEST_RECV_TIMEOUT; + let stream = loop { + match listener.accept() { + Ok((stream, _)) => break stream, + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + if Instant::now() >= deadline { + let _ = sender.send(false); + return; + } + thread::sleep(Duration::from_millis(10)); + } + Err(_) => { + let _ = sender.send(false); + return; + } + } + }; + stream.set_read_timeout(Some(TEST_RECV_TIMEOUT)).unwrap(); let mut reader = BufReader::new(stream); let mut buffer = String::new(); - reader.read_to_string(&mut buffer).unwrap(); - sender.send(()).unwrap(); + let saw_eof = reader.read_to_string(&mut buffer).is_ok(); + let _ = sender.send(saw_eof); }); (address, receiver) } @@ -273,6 +311,37 @@ fn streaming_exporter_writes_canonical_event_json_values_to_socket() { assert_eq!(exporter.stats().events_sent, 1); } +#[test] +fn streaming_exporter_drops_when_queue_is_full_without_poisoning_stream() { + let (sender, receiver) = mpsc::sync_channel(1); + assert!( + sender + .try_send(AtofStreamMessage::Event("{\"queued\":true}".to_string())) + .is_ok() + ); + let exporter = AtofStreamingExporter { + address: "127.0.0.1:0".to_string(), + state: Arc::new(Mutex::new(AtofStreamingExporterState { + sender: Some(sender), + writer_thread: None, + events_sent: 0, + events_dropped: 0, + last_error: Arc::new(Mutex::new(None)), + })), + }; + + (exporter.subscriber())(&make_mark_event("queue-full")); + + let stats = exporter.stats(); + assert_eq!(stats.events_sent, 0); + assert_eq!(stats.events_dropped, 1); + assert_eq!(stats.last_error, None); + assert!(matches!( + receiver.try_recv().unwrap(), + AtofStreamMessage::Event(value) if value == "{\"queued\":true}" + )); +} + #[test] fn streaming_exporter_reports_connection_failure() { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -314,7 +383,7 @@ fn streaming_exporter_shutdown_closes_stream_after_stored_error() { error, AtofExporterError::StoredStreamFailure { .. } )); - receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + assert!(receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); } #[test] diff --git a/docs/observability-plugin/atof.mdx b/docs/observability-plugin/atof.mdx index 9cae56af..0744a695 100644 --- a/docs/observability-plugin/atof.mdx +++ b/docs/observability-plugin/atof.mdx @@ -237,7 +237,10 @@ events as they are emitted instead of reading a completed JSONL file. The streaming exporter connects to a separate local process over TCP and writes one canonical ATOF JSON object per JSONL line. The receiver process can then fan the events out through HTTP, SSE, WebSocket, stdout, or another local transport -without changing the ATOF event contract. +without changing the ATOF event contract. `AtofStreamingExporter` uses a +per-exporter bounded queue: producers do not block, the newest event is dropped +when that bounded queue is full, and `stats().events_dropped` records the +overflow while fatal stream disconnects are still surfaced as errors. ```rust use nemo_relay::observability::atof::{ From 42ccba91ecb200d4d887c7674015a948e2ec5f4c Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 17:01:36 -0700 Subject: [PATCH 3/9] test: keep ATOF socket sink alive through shutdown Signed-off-by: Omkar Mehta --- crates/core/tests/unit/observability/atof_tests.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 30217c11..05dae5dc 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -153,6 +153,8 @@ fn start_atof_socket_sink( Ok(_) => events.push(serde_json::from_str(line.trim_end()).unwrap()), } } + let mut drain = String::new(); + let _ = reader.read_to_string(&mut drain); let _ = sender.send(events); }); (address, receiver) From 63eff0d026c3dfb17ece265f362f3011412d6bc0 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 20:02:41 -0700 Subject: [PATCH 4/9] test: wait for streaming ATOF shutdown EOF Signed-off-by: Omkar Mehta --- .../tests/unit/observability/atof_tests.rs | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 05dae5dc..a0caf5df 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -118,13 +118,24 @@ fn read_jsonl(path: &Path) -> Vec { .collect() } -fn start_atof_socket_sink( - expected_events: usize, -) -> (String, mpsc::Receiver>) { +struct AtofSocketSink { + address: String, + events: mpsc::Receiver>, + release: mpsc::Sender<()>, +} + +impl AtofSocketSink { + fn release_after_shutdown(&self) { + let _ = self.release.send(()); + } +} + +fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); listener.set_nonblocking(true).unwrap(); let address = listener.local_addr().unwrap().to_string(); let (sender, receiver) = mpsc::channel(); + let (release_sender, release_receiver) = mpsc::channel(); thread::spawn(move || { let deadline = Instant::now() + TEST_RECV_TIMEOUT; let stream = loop { @@ -153,11 +164,14 @@ fn start_atof_socket_sink( Ok(_) => events.push(serde_json::from_str(line.trim_end()).unwrap()), } } - let mut drain = String::new(); - let _ = reader.read_to_string(&mut drain); let _ = sender.send(events); + let _ = release_receiver.recv_timeout(TEST_RECV_TIMEOUT); }); - (address, receiver) + AtofSocketSink { + address, + events: receiver, + release: release_sender, + } } fn start_atof_eof_sink() -> (String, mpsc::Receiver) { @@ -301,14 +315,15 @@ fn subscriber_writes_canonical_event_jsonl() { #[test] fn streaming_exporter_writes_canonical_event_json_values_to_socket() { - let (address, receiver) = start_atof_socket_sink(1); - let exporter = AtofStreamingExporter::connect(address).unwrap(); + let sink = start_atof_socket_sink(1); + let exporter = AtofStreamingExporter::connect(sink.address.clone()).unwrap(); let event = make_annotated_llm_event("streamed-llm-start"); (exporter.subscriber())(&event); exporter.shutdown().unwrap(); + sink.release_after_shutdown(); - let delivered = receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + let delivered = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); assert_eq!(delivered[0], event.try_to_json_value().unwrap()); assert_eq!(exporter.stats().events_sent, 1); } @@ -430,8 +445,8 @@ fn streaming_exporter_registers_with_runtime_events() { ) .unwrap(); - let (address, receiver) = start_atof_socket_sink(3); - let exporter = AtofStreamingExporter::connect(address).unwrap(); + let sink = start_atof_socket_sink(3); + let exporter = AtofStreamingExporter::connect(sink.address.clone()).unwrap(); let name = format!("atof_streaming_exporter_{}", Uuid::now_v7()); exporter.register(&name).unwrap(); @@ -462,8 +477,9 @@ fn streaming_exporter_registers_with_runtime_events() { assert!(exporter.deregister(&name).unwrap()); assert!(!exporter.deregister(&name).unwrap()); exporter.shutdown().unwrap(); + sink.release_after_shutdown(); - let events = receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + let events = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); let scope_start = &events[0]; let mark = &events[1]; let scope_end = &events[2]; From 525019e5b6827ecbf0e32984ace29a4de2f64423 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 22:21:47 -0700 Subject: [PATCH 5/9] test: drain ATOF socket sink until shutdown Signed-off-by: Omkar Mehta --- .../core/tests/unit/observability/atof_tests.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index a0caf5df..2b589f1f 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -121,13 +121,6 @@ fn read_jsonl(path: &Path) -> Vec { struct AtofSocketSink { address: String, events: mpsc::Receiver>, - release: mpsc::Sender<()>, -} - -impl AtofSocketSink { - fn release_after_shutdown(&self) { - let _ = self.release.send(()); - } } fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { @@ -135,7 +128,6 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { listener.set_nonblocking(true).unwrap(); let address = listener.local_addr().unwrap().to_string(); let (sender, receiver) = mpsc::channel(); - let (release_sender, release_receiver) = mpsc::channel(); thread::spawn(move || { let deadline = Instant::now() + TEST_RECV_TIMEOUT; let stream = loop { @@ -165,12 +157,13 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { } } let _ = sender.send(events); - let _ = release_receiver.recv_timeout(TEST_RECV_TIMEOUT); + let _ = reader.get_ref().set_read_timeout(None); + let mut drain = String::new(); + let _ = reader.read_to_string(&mut drain); }); AtofSocketSink { address, events: receiver, - release: release_sender, } } @@ -321,7 +314,6 @@ fn streaming_exporter_writes_canonical_event_json_values_to_socket() { (exporter.subscriber())(&event); exporter.shutdown().unwrap(); - sink.release_after_shutdown(); let delivered = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); assert_eq!(delivered[0], event.try_to_json_value().unwrap()); @@ -477,7 +469,6 @@ fn streaming_exporter_registers_with_runtime_events() { assert!(exporter.deregister(&name).unwrap()); assert!(!exporter.deregister(&name).unwrap()); exporter.shutdown().unwrap(); - sink.release_after_shutdown(); let events = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); let scope_start = &events[0]; From 9cf6dafefe2d2069471dcedebdbb48485cc1d528 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 22:25:25 -0700 Subject: [PATCH 6/9] test: assert ATOF socket sink observes EOF Signed-off-by: Omkar Mehta --- crates/core/tests/unit/observability/atof_tests.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 2b589f1f..9c26e319 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -121,6 +121,7 @@ fn read_jsonl(path: &Path) -> Vec { struct AtofSocketSink { address: String, events: mpsc::Receiver>, + saw_eof: mpsc::Receiver, } fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { @@ -128,6 +129,7 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { listener.set_nonblocking(true).unwrap(); let address = listener.local_addr().unwrap().to_string(); let (sender, receiver) = mpsc::channel(); + let (eof_sender, eof_receiver) = mpsc::channel(); thread::spawn(move || { let deadline = Instant::now() + TEST_RECV_TIMEOUT; let stream = loop { @@ -136,12 +138,14 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { if Instant::now() >= deadline { let _ = sender.send(Vec::new()); + let _ = eof_sender.send(false); return; } thread::sleep(Duration::from_millis(10)); } Err(_) => { let _ = sender.send(Vec::new()); + let _ = eof_sender.send(false); return; } } @@ -159,11 +163,12 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { let _ = sender.send(events); let _ = reader.get_ref().set_read_timeout(None); let mut drain = String::new(); - let _ = reader.read_to_string(&mut drain); + let _ = eof_sender.send(reader.read_to_string(&mut drain).is_ok()); }); AtofSocketSink { address, events: receiver, + saw_eof: eof_receiver, } } @@ -318,6 +323,7 @@ fn streaming_exporter_writes_canonical_event_json_values_to_socket() { let delivered = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); assert_eq!(delivered[0], event.try_to_json_value().unwrap()); assert_eq!(exporter.stats().events_sent, 1); + assert!(sink.saw_eof.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); } #[test] @@ -492,6 +498,7 @@ fn streaming_exporter_registers_with_runtime_events() { .all(|event| event["name"] != "pre_atof_streaming_mark") ); assert_eq!(exporter.stats().events_sent, 3); + assert!(sink.saw_eof.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); } #[test] From f884624248ba3cdc65e96d90a02cca8013551c2d Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Fri, 29 May 2026 11:44:35 -0700 Subject: [PATCH 7/9] test: keep ATOF socket sinks blocking through shutdown Signed-off-by: Omkar Mehta --- .../tests/unit/observability/atof_tests.rs | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 9c26e319..0433b9d2 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -13,7 +13,7 @@ use crate::api::scope::{EmitMarkEventParams, PopScopeParams, PushScopeParams, Sc use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent}; use serde_json::{Map, json}; use std::fs; -use std::io::{BufRead, BufReader, Read}; +use std::io::{BufRead, BufReader, ErrorKind, Read}; use std::net::TcpListener; use std::sync::Arc; use std::sync::mpsc; @@ -118,6 +118,17 @@ fn read_jsonl(path: &Path) -> Vec { .collect() } +fn is_expected_stream_termination(error: &std::io::Error) -> bool { + matches!( + error.kind(), + ErrorKind::BrokenPipe + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset + | ErrorKind::NotConnected + | ErrorKind::UnexpectedEof + ) +} + struct AtofSocketSink { address: String, events: mpsc::Receiver>, @@ -150,6 +161,7 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { } } }; + stream.set_nonblocking(false).unwrap(); stream.set_read_timeout(Some(TEST_RECV_TIMEOUT)).unwrap(); let mut reader = BufReader::new(stream); let mut events = Vec::with_capacity(expected_events); @@ -163,7 +175,11 @@ fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { let _ = sender.send(events); let _ = reader.get_ref().set_read_timeout(None); let mut drain = String::new(); - let _ = eof_sender.send(reader.read_to_string(&mut drain).is_ok()); + let saw_termination = match reader.read_to_string(&mut drain) { + Ok(_) => true, + Err(error) => is_expected_stream_termination(&error), + }; + let _ = eof_sender.send(saw_termination); }); AtofSocketSink { address, @@ -195,11 +211,15 @@ fn start_atof_eof_sink() -> (String, mpsc::Receiver) { } } }; + stream.set_nonblocking(false).unwrap(); stream.set_read_timeout(Some(TEST_RECV_TIMEOUT)).unwrap(); let mut reader = BufReader::new(stream); let mut buffer = String::new(); - let saw_eof = reader.read_to_string(&mut buffer).is_ok(); - let _ = sender.send(saw_eof); + let saw_termination = match reader.read_to_string(&mut buffer) { + Ok(_) => true, + Err(error) => is_expected_stream_termination(&error), + }; + let _ = sender.send(saw_termination); }); (address, receiver) } @@ -318,9 +338,10 @@ fn streaming_exporter_writes_canonical_event_json_values_to_socket() { let event = make_annotated_llm_event("streamed-llm-start"); (exporter.subscriber())(&event); - exporter.shutdown().unwrap(); let delivered = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + exporter.shutdown().unwrap(); + assert_eq!(delivered[0], event.try_to_json_value().unwrap()); assert_eq!(exporter.stats().events_sent, 1); assert!(sink.saw_eof.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); @@ -474,9 +495,10 @@ fn streaming_exporter_registers_with_runtime_events() { assert!(exporter.deregister(&name).unwrap()); assert!(!exporter.deregister(&name).unwrap()); - exporter.shutdown().unwrap(); let events = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + exporter.shutdown().unwrap(); + let scope_start = &events[0]; let mark = &events[1]; let scope_end = &events[2]; From d1c35fc95be8a630a7cfb16c0897b507b595a4b4 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Fri, 29 May 2026 11:51:54 -0700 Subject: [PATCH 8/9] docs: propose local cockpit viewer boundary Signed-off-by: Omkar Mehta --- docs/observability-plugin/about.mdx | 2 + .../local-cockpit-viewer.mdx | 234 ++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 docs/observability-plugin/local-cockpit-viewer.mdx diff --git a/docs/observability-plugin/about.mdx b/docs/observability-plugin/about.mdx index 5e3eeef8..961697b6 100644 --- a/docs/observability-plugin/about.mdx +++ b/docs/observability-plugin/about.mdx @@ -89,3 +89,5 @@ are not written into ATIF. - [OpenTelemetry](/observability-plugin/opentelemetry) covers generic OTLP trace export. - [OpenInference](/observability-plugin/openinference) covers OpenInference-oriented OTLP trace export. +- [Local Cockpit Viewer Design](/observability-plugin/local-cockpit-viewer) proposes a local + viewer boundary on top of Relay events. diff --git a/docs/observability-plugin/local-cockpit-viewer.mdx b/docs/observability-plugin/local-cockpit-viewer.mdx new file mode 100644 index 00000000..9fe2e789 --- /dev/null +++ b/docs/observability-plugin/local-cockpit-viewer.mdx @@ -0,0 +1,234 @@ +--- +title: "Local Cockpit Viewer Design" +description: "" +position: 7 +--- +{/* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-License-Identifier: Apache-2.0 */} + +This page proposes a local cockpit viewer for NeMo Relay observability data. +The viewer is a consumer of Relay events, not a second tracing system. Relay +continues to own capture, lifecycle events, ATOF, ATIF, exporter contracts, +sanitization hooks, and missing-field semantics. The cockpit owns local +developer UX on top of those contracts. + +## Problem + +Coding agents can run across shells, editors, providers, plugins, tools, and +subagents. Developers often need to answer practical questions while a run is +still warm: + +- What session or prompt is active? +- Which model, tool, file, skill, memory, or subagent drove the result? +- Which token or cost fields are observed, and which are unavailable? +- Which hook or provider limitation explains missing trace detail? +- What can be safely shared, replayed, or resumed? + +File export alone is useful for post-run analysis, but local agent work also +needs a live read-only surface. The goal is to make traces inspectable without +turning NeMo Relay into an opinionated dashboard product. + +## Product Boundary + +Relay should provide the generic substrate that every downstream consumer can +reuse: + +- Runtime capture and lifecycle events. +- ATOF as the Relay-native event stream. +- ATIF and other exporter formats for trajectory and trace consumers. +- Streaming export from the instrumented process to a separate local process. +- Token, cost, and capture-quality fields when they are broadly useful. +- Sanitizer and redaction extension points before data leaves the developer + machine or process boundary. +- Stable examples and fixtures that show how a viewer consumes Relay output. + +The cockpit should provide the developer-facing experience: + +- Session list, prompt timeline, event tree, file impact, tool calls, skills, + memory, subagents, and usage panels. +- Linked selection between timeline, event details, context, files, and usage. +- Clear provenance labels such as `observed`, `inferred`, `unavailable`, or + `unsupported`. +- Local-only read mode by default. +- Optional packaging outside Relay if the UX grows into a separate application. + +This separation lets Relay stay the source of truth for events while still +supporting polished local UIs. + +## Proposed Shape + +The first useful contribution can be small: + +1. Keep the streaming ATOF exporter as a low-level Relay capability. +2. Add a documented local viewer contract for consumers of the ATOF stream. +3. Add one tiny example receiver that reads ATOF JSONL from the streaming + exporter and exposes it to a browser or CLI. +4. Keep the full cockpit implementation outside the runtime unless maintainers + decide that a reference viewer belongs in this repository. + +The important design constraint is process separation. An agent process should +emit events to a local receiver over a socket or HTTP-style boundary. The viewer +process should own UI state, filtering, fan-out, and browser transport. That +keeps the agent runtime lean and lets the receiver restart or evolve without +replacing Relay capture. + +```mermaid +flowchart LR + Agent["Instrumented agent or harness"] --> Relay["NeMo Relay runtime"] + Relay --> ATOF["Streaming ATOF exporter"] + ATOF --> Receiver["Local receiver process"] + Receiver --> Browser["Local cockpit UI"] + Receiver --> CLI["CLI inspection"] + Receiver --> Files["Optional local trace files"] +``` + +## Event Contract + +The viewer should treat ATOF as the live event source and ATIF as a trajectory +or replay artifact. ATOF is the right stream for timeline and live state because +it preserves lifecycle order. ATIF is the right shape for post-run trajectory +analysis, evaluation, and interchange. + +Viewer consumers need field provenance. A field should not silently appear more +complete than the capture layer can prove. Suggested provenance values: + +| Provenance | Meaning | +|---|---| +| `observed` | Relay captured the field directly from runtime, provider, or wrapper data. | +| `derived` | Relay derived the field from other observed Relay data. | +| `inferred` | A viewer inferred the field from local files, names, timestamps, or heuristics. | +| `unavailable` | Relay knows the field exists conceptually but did not capture it. | +| `unsupported` | The current provider, hook, or integration cannot expose the field. | + +Examples: + +- Token counts should be `observed` only when Relay receives real usage fields. +- Cost can be `derived` only from observed token counts plus a declared pricing + source. It should not claim invoice truth. +- Missing Codex hooks should be visible as `unsupported` or `unavailable` + instead of being papered over by UI guesses. +- Local file-touch summaries can be `inferred` if they come from local history + rather than Relay events. + +## Capture Quality Signals + +Relay should expose generic capture-quality signals when they help every +consumer: + +- Provider supported or unsupported. +- Hook present or missing. +- LLM span unavailable. +- Tool call input or output redacted. +- Token usage unavailable. +- Cost unavailable or derived from local policy. +- Subagent hierarchy unavailable. +- Event dropped due to backpressure. +- Exporter disconnected or receiver unavailable. + +These signals are more useful than blank panels because they tell the developer +whether the system is quiet, incomplete, or unsupported. + +## Local Viewer UX + +A good local cockpit should help developers answer questions quickly: + +- A left rail lists runs and sessions by human-readable title, provider, age, + state, and workspace. +- A center timeline shows prompt, scope, tool, LLM, file, and subagent events. +- A detail pane explains the selected event with exact field provenance. +- A usage panel shows per-session token and cost evidence, not only totals. +- A file-impact view groups touched files by path and highlights hot areas. +- A compare mode shows two runs side by side when debugging regressions. + +The viewer should avoid pretending to be complete. Empty panels should say why +the data is missing and which integration or hook would make it available. + +## Packaging Options + +There are three reasonable packaging paths: + +| Option | What ships in Relay | What ships outside Relay | +|---|---|---| +| Example viewer | Minimal receiver and browser example | Full product cockpit | +| Companion package | Viewer package maintained beside Relay APIs | Enterprise installers and branding | +| External app | Only exporter contract, fixtures, and docs | Entire cockpit implementation | + +The proposed default is example viewer first. It validates the Relay contract +without committing Relay maintainers to own a full UI product. If the example +becomes broadly useful, it can graduate to a companion package. + +## Redaction And Sharing + +Local-first does not remove the need for sanitization. Before traces are shared +outside the developer machine, the pipeline should support: + +- Provider payload redaction. +- File path and workspace redaction. +- Secret pattern redaction. +- Prompt and response redaction. +- Clear marking of redacted fields in the viewer. +- Export policies that separate local inspection from shareable artifacts. + +The viewer should display redaction status explicitly so reviewers know whether +they are looking at raw local evidence or a share-safe trace. + +## Non-Goals + +The first design should not: + +- Replace ATIF, OpenTelemetry, or OpenInference. +- Define a new tracing schema parallel to Relay events. +- Require Relay to own a large frontend application. +- Claim real billing cost without observed usage and an explicit pricing source. +- Infer missing provider data without provenance. + +## Milestones + +### M0: Alignment + +- Agree that the cockpit is a Relay event consumer. +- Agree that Relay owns generic event/exporter/schema pieces. +- Decide whether the first viewer artifact should be an example, companion + package, or separate application. + +### M1: Streaming ATOF + +- Provide a streaming ATOF exporter from the instrumented process to a local + receiver process. +- Document backpressure, disconnect, shutdown, and dropped-event behavior. +- Add socket-based tests that model a separate receiver process. + +### M2: Viewer Contract + +- Document the event fields and provenance states the viewer can rely on. +- Add a fixture trace that includes prompt, scope, tool, LLM, token, missing + field, and redaction examples. +- Provide a minimal receiver that proves live consumption. + +### M3: Local Cockpit Prototype + +- Render a session list, event timeline, selected-event inspector, and per-run + usage evidence. +- Label unavailable fields instead of filling them with guesses. +- Keep the prototype local-only and read-only by default. + +### M4: Product Decision + +- Decide whether the UI remains a separate project, becomes a Relay companion + package, or stays as a reference example. +- Promote only generic contracts and reusable utilities into Relay. + +## Open Questions + +1. Should Relay ship only a minimal example viewer, or should it own a companion + viewer package? +2. Should live viewers consume ATOF directly, or should Relay add a receiver + process that transforms ATOF into browser-friendly Server-Sent Events or + WebSocket messages? +3. Which token and cost fields should be first-class Relay fields versus + viewer-local derived values? +4. What is the preferred sanitizer API for share-safe local traces? +5. Which Codex, Claude Code, Cursor, or harness hooks should emit explicit + capture-quality signals when data is missing? +6. Should cockpit examples live in `examples/`, `integrations/`, or the docs + site? From f1d01e08d8580a53c95d3d6a3b7dd0acae33029e Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Sat, 30 May 2026 01:57:01 -0700 Subject: [PATCH 9/9] Wire streaming ATOF through observability plugin Signed-off-by: Omkar Mehta --- crates/cli/src/plugins/editor_model.rs | 12 +- .../src/observability/plugin_component.rs | 117 +++++++++ .../observability/plugin_component_tests.rs | 145 ++++++++++- crates/node/observability.d.ts | 8 + crates/node/observability.js | 14 ++ .../node/tests/observability_plugin_tests.mjs | 8 +- docs/observability-plugin/about.mdx | 6 +- docs/observability-plugin/atof.mdx | 4 + docs/observability-plugin/configuration.mdx | 23 +- .../local-cockpit-viewer.mdx | 234 ------------------ .../streaming-atof-design.mdx | 113 +++++++++ go/nemo_relay/observability_plugin.go | 24 +- go/nemo_relay/observability_plugin_test.go | 13 +- python/nemo_relay/observability.py | 20 ++ python/nemo_relay/observability.pyi | 7 + python/tests/test_observability_plugin.py | 5 +- 16 files changed, 499 insertions(+), 254 deletions(-) delete mode 100644 docs/observability-plugin/local-cockpit-viewer.mdx create mode 100644 docs/observability-plugin/streaming-atof-design.mdx diff --git a/crates/cli/src/plugins/editor_model.rs b/crates/cli/src/plugins/editor_model.rs index 4c8fab0e..3bcbf1cd 100644 --- a/crates/cli/src/plugins/editor_model.rs +++ b/crates/cli/src/plugins/editor_model.rs @@ -33,9 +33,9 @@ pub(super) struct ComponentEditorState { #[derive(Debug)] pub(super) enum EditableComponent { - Observability(ComponentEditorState), - Adaptive(ComponentEditorState), - NemoGuardrails(ComponentEditorState), + Observability(Box>), + Adaptive(Box>), + NemoGuardrails(Box>), } impl EditableComponent { @@ -148,9 +148,9 @@ pub(super) fn editable_components( config: &PluginConfig, ) -> Result, CliError> { Ok(vec![ - EditableComponent::Observability(component_observability_state(config)?), - EditableComponent::Adaptive(component_adaptive_state(config)?), - EditableComponent::NemoGuardrails(component_nemo_guardrails_state(config)?), + EditableComponent::Observability(Box::new(component_observability_state(config)?)), + EditableComponent::Adaptive(Box::new(component_adaptive_state(config)?)), + EditableComponent::NemoGuardrails(Box::new(component_nemo_guardrails_state(config)?)), ]) } diff --git a/crates/core/src/observability/plugin_component.rs b/crates/core/src/observability/plugin_component.rs index 3da978a4..b443429d 100644 --- a/crates/core/src/observability/plugin_component.rs +++ b/crates/core/src/observability/plugin_component.rs @@ -35,6 +35,7 @@ use crate::api::subscriber::{scope_deregister_subscriber, scope_register_subscri use crate::observability::atif::{AtifAgentInfo, AtifExporter}; use crate::observability::atof::{ AtofExporter, AtofExporterConfig as CoreAtofExporterConfig, AtofExporterMode, + AtofStreamingExporter, AtofStreamingExporterConfig as CoreAtofStreamingExporterConfig, }; #[cfg(feature = "openinference")] use crate::observability::openinference::{ @@ -110,6 +111,9 @@ pub struct ObservabilityConfig { /// Filesystem-backed raw ATOF JSONL export. #[serde(default, skip_serializing_if = "Option::is_none")] pub atof: Option, + /// TCP-backed raw ATOF JSONL stream export to a separate receiver process. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub atof_stream: Option, /// Per-top-level-agent ATIF trajectory export. #[serde(default, skip_serializing_if = "Option::is_none")] pub atif: Option, @@ -129,6 +133,7 @@ impl Default for ObservabilityConfig { Self { version: default_observability_config_version(), atof: None, + atof_stream: None, atif: None, opentelemetry: None, openinference: None, @@ -172,6 +177,24 @@ impl Default for AtofSectionConfig { } } +/// TCP-backed raw ATOF JSONL stream exporter config. +/// +/// When enabled, this section wraps +/// [`crate::observability::atof::AtofStreamingExporter`] and connects to a +/// separate local receiver process. The receiver owns UI, HTTP, SSE, +/// WebSocket, or any other fan-out; Relay only emits canonical ATOF JSONL over +/// the configured TCP socket. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +pub struct AtofStreamSectionConfig { + /// Whether ATOF stream export is active. + #[serde(default)] + pub enabled: bool, + /// TCP address for the receiver process, for example `127.0.0.1:43199`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub address: Option, +} + /// Per-trajectory ATIF exporter config. /// /// When enabled, this section creates a dispatcher that opens a separate @@ -372,6 +395,13 @@ crate::editor_config! { nested: AtofSectionConfig, default: AtofSectionConfig, }, + atof_stream => { + label: "ATOF Stream", + kind: Section, + optional: true, + nested: AtofStreamSectionConfig, + default: AtofStreamSectionConfig, + }, atif => { label: "ATIF", kind: Section, @@ -411,6 +441,13 @@ crate::editor_config! { } } +crate::editor_config! { + impl AtofStreamSectionConfig { + enabled => { label: "enabled", kind: Boolean }, + address => { label: "address", kind: String, optional: true }, + } +} + crate::editor_config! { impl AtifSectionConfig { enabled => { label: "enabled", kind: Boolean }, @@ -538,6 +575,9 @@ fn register_observability( if let Some(atof) = config.atof.filter(|section| section.enabled) { register_atof_exporter(atof, ctx)?; } + if let Some(atof_stream) = config.atof_stream.filter(|section| section.enabled) { + register_atof_stream_exporter(atof_stream, ctx)?; + } if let Some(atif) = config.atif.filter(|section| section.enabled) { register_atif_dispatcher(atif, ctx)?; } @@ -579,6 +619,38 @@ fn register_atof_exporter( Ok(()) } +fn register_atof_stream_exporter( + section: AtofStreamSectionConfig, + ctx: &mut PluginRegistrationContext, +) -> PluginResult<()> { + let Some(address) = section + .address + .as_deref() + .map(str::trim) + .filter(|address| !address.is_empty()) + .map(ToOwned::to_owned) + else { + return Err(PluginError::InvalidConfig( + "ATOF stream address is required when atof_stream is enabled".to_string(), + )); + }; + let exporter = Arc::new( + AtofStreamingExporter::new(CoreAtofStreamingExporterConfig::new(address)) + .map_err(observability_registration_error)?, + ); + ctx.register_subscriber("atof_stream", exporter.subscriber())?; + ctx.add_registration(PluginRegistration::new( + "observability", + ctx.qualify_name("atof_stream.shutdown"), + Box::new(move || { + exporter + .shutdown() + .map_err(observability_registration_error) + }), + )); + Ok(()) +} + type AtifStorageList = Arc>>; fn register_atif_dispatcher( @@ -1287,6 +1359,7 @@ fn validate_observability_plugin_config( &[ "version", "atof", + "atof_stream", "atif", "opentelemetry", "openinference", @@ -1303,6 +1376,13 @@ fn validate_observability_plugin_config( "atof", &["enabled", "output_directory", "filename", "mode"], ); + validate_section_fields( + &mut diagnostics, + &config.policy, + plugin_config, + "atof_stream", + &["enabled", "address"], + ); validate_section_fields( &mut diagnostics, &config.policy, @@ -1371,6 +1451,20 @@ fn validate_observability_plugin_config( ); } } + if let Some(section) = &config.atof_stream { + validate_atof_stream_values(&mut diagnostics, &config.policy, section); + #[cfg(target_arch = "wasm32")] + if section.enabled { + push_policy_diag( + &mut diagnostics, + config.policy.unsupported_value, + "observability.unsupported_value", + Some("atof_stream".to_string()), + Some("enabled".to_string()), + "ATOF stream export is not supported on WebAssembly".to_string(), + ); + } + } if let Some(section) = &config.atif { validate_atif_values(&mut diagnostics, &config.policy, section); #[cfg(target_arch = "wasm32")] @@ -1492,6 +1586,29 @@ fn validate_atof_values( } } +fn validate_atof_stream_values( + diagnostics: &mut Vec, + policy: &ConfigPolicy, + section: &AtofStreamSectionConfig, +) { + if section.enabled + && section + .address + .as_deref() + .map(str::trim) + .is_none_or(str::is_empty) + { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atof_stream".to_string()), + Some("address".to_string()), + "atof_stream address is required when enabled".to_string(), + ); + } +} + fn validate_atif_values( diagnostics: &mut Vec, policy: &ConfigPolicy, diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index 540848f4..cd66bf97 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -17,7 +17,53 @@ use crate::plugin::{ }; use serde_json::json; use std::fs; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::io::{BufRead, BufReader, ErrorKind}; +use std::net::TcpListener; +use std::sync::mpsc; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +struct AtofStreamSink { + address: String, + receiver: mpsc::Receiver, +} + +fn start_atof_stream_sink() -> AtofStreamSink { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(2); + loop { + match listener.accept() { + Ok((stream, _)) => { + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let reader = BufReader::new(stream); + for line in reader.lines() { + match line { + Ok(line) if !line.trim().is_empty() => { + if let Ok(value) = serde_json::from_str(&line) { + let _ = sender.send(value); + } + } + Ok(_) => {} + Err(_) => break, + } + } + break; + } + Err(error) + if error.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => + { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + }); + AtofStreamSink { address, receiver } +} fn temp_dir(prefix: &str) -> PathBuf { let id = SystemTime::now() @@ -68,6 +114,18 @@ fn editor_schema_tracks_observability_config_types() { assert_eq!(mode.kind, EditorFieldKind::Enum); assert_eq!(mode.enum_values, &["append", "overwrite"]); + let stream = schema.field("atof_stream").expect("atof_stream section"); + assert_eq!(stream.label, "ATOF Stream"); + assert_eq!(stream.kind, EditorFieldKind::Section); + assert!(stream.optional); + + let stream_schema = stream.schema().expect("atof_stream editor schema"); + let address = stream_schema + .field("address") + .expect("stream address field"); + assert_eq!(address.kind, EditorFieldKind::String); + assert!(address.optional); + let otlp = schema .field("openinference") .expect("openinference section") @@ -161,6 +219,7 @@ fn default_config_and_component_conversion_cover_public_shape() { let defaults = ObservabilityConfig::default(); assert_eq!(defaults.version, 1); assert!(defaults.atof.is_none()); + assert!(defaults.atof_stream.is_none()); assert!(defaults.atif.is_none()); assert!(defaults.opentelemetry.is_none()); assert!(defaults.openinference.is_none()); @@ -171,6 +230,10 @@ fn default_config_and_component_conversion_cover_public_shape() { assert!(atof.output_directory.is_none()); assert!(atof.filename.is_none()); + let atof_stream = AtofStreamSectionConfig::default(); + assert!(!atof_stream.enabled); + assert!(atof_stream.address.is_none()); + let atif = AtifSectionConfig::default(); assert!(!atif.enabled); assert_eq!(atif.agent_name, "NeMo Relay"); @@ -186,6 +249,7 @@ fn default_config_and_component_conversion_cover_public_shape() { let generic: PluginComponentSpec = ComponentSpec::new(ObservabilityConfig { atof: Some(atof), + atof_stream: Some(atof_stream), atif: Some(atif), opentelemetry: Some(otlp.clone()), openinference: Some(otlp), @@ -195,6 +259,7 @@ fn default_config_and_component_conversion_cover_public_shape() { assert_eq!(generic.kind, OBSERVABILITY_PLUGIN_KIND); assert!(generic.enabled); assert_eq!(generic.config["version"], json!(1)); + assert_eq!(generic.config["atof_stream"]["enabled"], json!(false)); assert_eq!(generic.config["atif"]["agent_name"], json!("NeMo Relay")); } @@ -205,6 +270,7 @@ fn schema_contains_every_supported_observability_option() { for field in [ "version", "atof", + "atof_stream", "atif", "opentelemetry", "openinference", @@ -213,6 +279,7 @@ fn schema_contains_every_supported_observability_option() { "output_directory", "filename", "mode", + "address", "agent_name", "agent_version", "model_name", @@ -310,6 +377,7 @@ fn empty_and_disabled_config_register_nothing() { let config = plugin_config(json!({ "atof": {"enabled": false, "mode": "overwrite"}, + "atof_stream": {"enabled": false}, "atif": {"enabled": false}, "opentelemetry": {"enabled": false, "transport": "grpc"}, "openinference": {"enabled": false, "transport": "grpc"} @@ -377,6 +445,7 @@ fn unknown_fields_and_bad_values_follow_policy() { let warn_report = validate_plugin_config(&plugin_config(json!({ "atof": {"bogus": true, "mode": "invalid"}, + "atof_stream": {"enabled": true}, "atif": {"filename_template": "missing-session"} }))); assert!(warn_report.has_errors()); @@ -398,10 +467,18 @@ fn unknown_fields_and_bad_values_follow_policy() { .iter() .any(|diag| diag.field.as_deref() == Some("filename_template")) ); + assert!( + warn_report + .diagnostics + .iter() + .any(|diag| diag.component.as_deref() == Some("atof_stream") + && diag.field.as_deref() == Some("address")) + ); let ignore_report = validate_plugin_config(&plugin_config(json!({ "policy": {"unknown_field": "ignore", "unsupported_value": "ignore"}, "atof": {"bogus": true, "mode": "invalid"}, + "atof_stream": {"enabled": true}, "atif": {"filename_template": "missing-session"} }))); assert!(!ignore_report.has_errors()); @@ -520,6 +597,13 @@ fn initialization_fails_for_invalid_enabled_file_exporters() { let error = futures::executor::block_on(initialize_plugins(invalid_openinference_transport)) .unwrap_err(); assert!(error.to_string().contains("OpenInference transport")); + + let invalid_atof_stream = plugin_config(json!({ + "policy": {"unsupported_value": "ignore"}, + "atof_stream": {"enabled": true} + })); + let error = futures::executor::block_on(initialize_plugins(invalid_atof_stream)).unwrap_err(); + assert!(error.to_string().contains("ATOF stream address")); } #[test] @@ -570,6 +654,65 @@ fn atof_enabled_writes_jsonl_and_teardown_flushes() { assert!(lines[2].contains("\"scope_category\":\"end\"")); } +#[test] +fn atof_stream_enabled_writes_jsonl_to_receiver_and_teardown_flushes() { + let _guard = crate::observability::test_mutex().lock().unwrap(); + reset_runtime(); + let sink = start_atof_stream_sink(); + + let config = plugin_config(json!({ + "atof_stream": { + "enabled": true, + "address": sink.address + } + })); + assert!(!validate_plugin_config(&config).has_errors()); + futures::executor::block_on(initialize_plugins(config)).unwrap(); + + { + let state = global_context(); + let names = state + .read() + .unwrap() + .event_subscribers + .keys() + .cloned() + .collect::>(); + assert_eq!( + names, + vec!["__nemo_relay_plugin__observability__atof_stream"] + ); + } + + let agent = push_agent("atof-stream-agent"); + crate::api::scope::event( + crate::api::scope::EmitMarkEventParams::builder() + .name("checkpoint") + .parent(&agent) + .data(json!({"step": 1})) + .build(), + ) + .unwrap(); + pop(&agent); + clear_plugin_configuration().unwrap(); + + let records = (0..3) + .map(|_| { + sink.receiver + .recv_timeout(Duration::from_secs(2)) + .expect("streamed ATOF record") + }) + .collect::>(); + assert_eq!( + records + .iter() + .map(|record| record["kind"].as_str().unwrap()) + .collect::>(), + vec!["scope", "mark", "scope"] + ); + assert_eq!(records[1]["name"], json!("checkpoint")); +} + #[test] fn atif_defaults_create_one_file_per_top_level_agent() { let _guard = crate::observability::test_mutex().lock().unwrap(); diff --git a/crates/node/observability.d.ts b/crates/node/observability.d.ts index cae4555e..8ad0865c 100644 --- a/crates/node/observability.d.ts +++ b/crates/node/observability.d.ts @@ -13,6 +13,11 @@ export interface AtofConfig { mode?: 'append' | 'overwrite' | string; } +export interface AtofStreamConfig { + enabled?: boolean; + address?: string; +} + export interface S3StorageConfig { type: 's3'; bucket: string; @@ -53,6 +58,7 @@ export interface OtlpConfig { export interface Config { version?: number; atof?: AtofConfig; + atof_stream?: AtofStreamConfig; atif?: AtifConfig; opentelemetry?: OtlpConfig; openinference?: OtlpConfig; @@ -71,6 +77,8 @@ export declare const OBSERVABILITY_PLUGIN_KIND: 'observability'; export declare function defaultConfig(): Config; /** Create filesystem-backed Agent Trajectory Observability Format (ATOF) JSONL settings with defaults applied. */ export declare function atofConfig(config?: AtofConfig): AtofConfig; +/** Create TCP-backed Agent Trajectory Observability Format (ATOF) JSONL stream settings with defaults applied. */ +export declare function atofStreamConfig(config?: AtofStreamConfig): AtofStreamConfig; /** Create per-agent Agent Trajectory Interchange Format (ATIF) trajectory settings with defaults applied. */ export declare function atifConfig(config?: AtifConfig): AtifConfig; /** Create OTLP exporter settings for OpenTelemetry or OpenInference. */ diff --git a/crates/node/observability.js b/crates/node/observability.js index 7ce69c93..9152da46 100644 --- a/crates/node/observability.js +++ b/crates/node/observability.js @@ -32,6 +32,19 @@ function atofConfig(config = {}) { }; } +/** + * Create TCP-backed ATOF JSONL stream settings with defaults applied. + * + * @param {object} [config={}] - Partial ATOF stream settings to override. + * @returns {object} A normalized ATOF stream config object. + */ +function atofStreamConfig(config = {}) { + return { + enabled: false, + ...config, + }; +} + /** * Create per-agent ATIF trajectory settings with defaults applied. * @@ -83,6 +96,7 @@ module.exports = { OBSERVABILITY_PLUGIN_KIND, defaultConfig, atofConfig, + atofStreamConfig, atifConfig, otlpConfig, ComponentSpec, diff --git a/crates/node/tests/observability_plugin_tests.mjs b/crates/node/tests/observability_plugin_tests.mjs index c4e21fc1..de1bf7d0 100644 --- a/crates/node/tests/observability_plugin_tests.mjs +++ b/crates/node/tests/observability_plugin_tests.mjs @@ -21,6 +21,7 @@ describe('observability plugin helpers', () => { it('builds defaults and plugin component shape', () => { assert.deepEqual(observability.defaultConfig(), { version: 1 }); assert.deepEqual(observability.atofConfig(), { enabled: false, mode: 'append' }); + assert.deepEqual(observability.atofStreamConfig(), { enabled: false }); assert.deepEqual(observability.atifConfig(), { enabled: false, agent_name: 'NeMo Relay', @@ -49,11 +50,16 @@ describe('observability plugin helpers', () => { observability.ComponentSpec({ version: 1, atof: observability.atofConfig({ mode: 'bad' }), + atof_stream: observability.atofStreamConfig({ enabled: true }), atif: observability.atifConfig({ filename_template: 'missing-placeholder.json' }), }), ], }); - assert.deepEqual(report.diagnostics.map((diagnostic) => diagnostic.field).sort(), ['filename_template', 'mode']); + assert.deepEqual(report.diagnostics.map((diagnostic) => diagnostic.field).sort(), [ + 'address', + 'filename_template', + 'mode', + ]); }); it('activates ATOF and ATIF file sinks', async () => { diff --git a/docs/observability-plugin/about.mdx b/docs/observability-plugin/about.mdx index 961697b6..776b52ae 100644 --- a/docs/observability-plugin/about.mdx +++ b/docs/observability-plugin/about.mdx @@ -21,6 +21,7 @@ Format (ATIF), OpenTelemetry, or OpenInference. The first-party plugin component has kind `observability`. It can install: - Agent Trajectory Observability Format (ATOF) JSONL export for raw lifecycle events. +- Streaming Agent Trajectory Observability Format (ATOF) JSONL export to a separate receiver process. - Agent Trajectory Interchange Format (ATIF) trajectory export for each top-level agent scope. - OpenTelemetry OTLP trace export. - OpenInference-oriented OTLP trace export. @@ -55,6 +56,7 @@ Choose the exporter based on the downstream system: | Need | Use | |---|---| | Raw canonical event stream | [Agent Trajectory Observability Format (ATOF)](/observability-plugin/atof) | +| Live process-separated raw event stream | [Streaming ATOF Design](/observability-plugin/streaming-atof-design) | | Offline analysis, replay, or evaluation trajectories | [Agent Trajectory Interchange Format (ATIF)](/observability-plugin/atif) | | Generic OTLP traces | [OpenTelemetry](/observability-plugin/opentelemetry) | | OpenInference-oriented agent and LLM spans | [OpenInference](/observability-plugin/openinference) | @@ -85,9 +87,9 @@ are not written into ATIF. - [Observability Configuration](/observability-plugin/configuration) documents the whole plugin component shape, activation, validation, and teardown. - [Agent Trajectory Observability Format (ATOF)](/observability-plugin/atof) covers raw JSONL event stream export. +- [Streaming ATOF Design](/observability-plugin/streaming-atof-design) covers TCP JSONL + streaming to a separate receiver process. - [Agent Trajectory Interchange Format (ATIF)](/observability-plugin/atif) covers per-agent trajectory export. - [OpenTelemetry](/observability-plugin/opentelemetry) covers generic OTLP trace export. - [OpenInference](/observability-plugin/openinference) covers OpenInference-oriented OTLP trace export. -- [Local Cockpit Viewer Design](/observability-plugin/local-cockpit-viewer) proposes a local - viewer boundary on top of Relay events. diff --git a/docs/observability-plugin/atof.mdx b/docs/observability-plugin/atof.mdx index 0744a695..63eccc2e 100644 --- a/docs/observability-plugin/atof.mdx +++ b/docs/observability-plugin/atof.mdx @@ -264,6 +264,10 @@ Register the streaming exporter before the instrumented work starts. The receiver process only observes future events and does not replay earlier lifecycle events. +For plugin-managed streaming, use the Observability plugin `atof_stream` +section. The design details and failure semantics are documented in +[Streaming ATOF Design](/observability-plugin/streaming-atof-design). + A minimal receiver can block on TCP lines and exit when the exporter closes the connection. This keeps short idle gaps from ending the stream: diff --git a/docs/observability-plugin/configuration.mdx b/docs/observability-plugin/configuration.mdx index 3bedd30f..083aec6f 100644 --- a/docs/observability-plugin/configuration.mdx +++ b/docs/observability-plugin/configuration.mdx @@ -36,6 +36,7 @@ only when it includes `enabled: true`. | Section | Runtime behavior | |---|---| | `atof` | Registers a global Agent Trajectory Observability Format (ATOF) JSONL exporter for raw lifecycle events. | +| `atof_stream` | Registers a global TCP-backed ATOF JSONL stream exporter to a separate receiver process. | | `atif` | Registers one Agent Trajectory Interchange Format (ATIF) dispatcher that writes one trajectory file for each top-level agent scope. | | `opentelemetry` | Registers a global OpenTelemetry OTLP subscriber. | | `openinference` | Registers a global OpenInference OTLP subscriber. | @@ -45,6 +46,7 @@ component-local subscriber names and registers them under the observability plugin namespace: - Agent Trajectory Observability Format (ATOF): `__nemo_relay_plugin__observability__atof` +- Streaming Agent Trajectory Observability Format (ATOF): `__nemo_relay_plugin__observability__atof_stream` - Agent Trajectory Interchange Format (ATIF) dispatcher: `__nemo_relay_plugin__observability__atif` - Per-agent ATIF scope subscriber: `__nemo_relay_plugin__observability__atif-{agent_scope_uuid}` - OpenTelemetry: `__nemo_relay_plugin__observability__opentelemetry` @@ -68,6 +70,10 @@ output_directory = "logs" filename = "events.jsonl" mode = "overwrite" +[components.config.atof_stream] +enabled = true +address = "127.0.0.1:43199" + [components.config.atif] enabled = true output_directory = "logs" @@ -128,6 +134,7 @@ from nemo_relay import plugin, scope, ScopeType from nemo_relay.observability import ( AtifConfig, AtofConfig, + AtofStreamConfig, ComponentSpec, ObservabilityConfig, OtlpConfig, @@ -143,6 +150,10 @@ config = plugin.PluginConfig( filename="events.jsonl", mode="overwrite", ), + atof_stream=AtofStreamConfig( + enabled=True, + address="127.0.0.1:43199", + ), atif=AtifConfig( enabled=True, output_directory="logs", @@ -201,6 +212,10 @@ await plugin.initialize({ filename: "events.jsonl", mode: "overwrite", }), + atof_stream: observability.atofStreamConfig({ + enabled: true, + address: "127.0.0.1:43199", + }), atif: observability.atifConfig({ enabled: true, output_directory: "logs", @@ -244,8 +259,8 @@ try { ```rust use nemo_relay::observability::plugin_component::{ - AtifSectionConfig, AtofSectionConfig, ComponentSpec, ObservabilityConfig, - OtlpSectionConfig, + AtifSectionConfig, AtofSectionConfig, AtofStreamSectionConfig, + ComponentSpec, ObservabilityConfig, OtlpSectionConfig, }; use nemo_relay::plugin::{initialize_plugins, validate_plugin_config, PluginConfig}; @@ -256,6 +271,10 @@ let component = ComponentSpec::new(ObservabilityConfig { filename: Some("events.jsonl".into()), mode: "overwrite".into(), }), + atof_stream: Some(AtofStreamSectionConfig { + enabled: true, + address: Some("127.0.0.1:43199".into()), + }), atif: Some(AtifSectionConfig { enabled: true, output_directory: Some("logs".into()), diff --git a/docs/observability-plugin/local-cockpit-viewer.mdx b/docs/observability-plugin/local-cockpit-viewer.mdx deleted file mode 100644 index 9fe2e789..00000000 --- a/docs/observability-plugin/local-cockpit-viewer.mdx +++ /dev/null @@ -1,234 +0,0 @@ ---- -title: "Local Cockpit Viewer Design" -description: "" -position: 7 ---- -{/* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -SPDX-License-Identifier: Apache-2.0 */} - -This page proposes a local cockpit viewer for NeMo Relay observability data. -The viewer is a consumer of Relay events, not a second tracing system. Relay -continues to own capture, lifecycle events, ATOF, ATIF, exporter contracts, -sanitization hooks, and missing-field semantics. The cockpit owns local -developer UX on top of those contracts. - -## Problem - -Coding agents can run across shells, editors, providers, plugins, tools, and -subagents. Developers often need to answer practical questions while a run is -still warm: - -- What session or prompt is active? -- Which model, tool, file, skill, memory, or subagent drove the result? -- Which token or cost fields are observed, and which are unavailable? -- Which hook or provider limitation explains missing trace detail? -- What can be safely shared, replayed, or resumed? - -File export alone is useful for post-run analysis, but local agent work also -needs a live read-only surface. The goal is to make traces inspectable without -turning NeMo Relay into an opinionated dashboard product. - -## Product Boundary - -Relay should provide the generic substrate that every downstream consumer can -reuse: - -- Runtime capture and lifecycle events. -- ATOF as the Relay-native event stream. -- ATIF and other exporter formats for trajectory and trace consumers. -- Streaming export from the instrumented process to a separate local process. -- Token, cost, and capture-quality fields when they are broadly useful. -- Sanitizer and redaction extension points before data leaves the developer - machine or process boundary. -- Stable examples and fixtures that show how a viewer consumes Relay output. - -The cockpit should provide the developer-facing experience: - -- Session list, prompt timeline, event tree, file impact, tool calls, skills, - memory, subagents, and usage panels. -- Linked selection between timeline, event details, context, files, and usage. -- Clear provenance labels such as `observed`, `inferred`, `unavailable`, or - `unsupported`. -- Local-only read mode by default. -- Optional packaging outside Relay if the UX grows into a separate application. - -This separation lets Relay stay the source of truth for events while still -supporting polished local UIs. - -## Proposed Shape - -The first useful contribution can be small: - -1. Keep the streaming ATOF exporter as a low-level Relay capability. -2. Add a documented local viewer contract for consumers of the ATOF stream. -3. Add one tiny example receiver that reads ATOF JSONL from the streaming - exporter and exposes it to a browser or CLI. -4. Keep the full cockpit implementation outside the runtime unless maintainers - decide that a reference viewer belongs in this repository. - -The important design constraint is process separation. An agent process should -emit events to a local receiver over a socket or HTTP-style boundary. The viewer -process should own UI state, filtering, fan-out, and browser transport. That -keeps the agent runtime lean and lets the receiver restart or evolve without -replacing Relay capture. - -```mermaid -flowchart LR - Agent["Instrumented agent or harness"] --> Relay["NeMo Relay runtime"] - Relay --> ATOF["Streaming ATOF exporter"] - ATOF --> Receiver["Local receiver process"] - Receiver --> Browser["Local cockpit UI"] - Receiver --> CLI["CLI inspection"] - Receiver --> Files["Optional local trace files"] -``` - -## Event Contract - -The viewer should treat ATOF as the live event source and ATIF as a trajectory -or replay artifact. ATOF is the right stream for timeline and live state because -it preserves lifecycle order. ATIF is the right shape for post-run trajectory -analysis, evaluation, and interchange. - -Viewer consumers need field provenance. A field should not silently appear more -complete than the capture layer can prove. Suggested provenance values: - -| Provenance | Meaning | -|---|---| -| `observed` | Relay captured the field directly from runtime, provider, or wrapper data. | -| `derived` | Relay derived the field from other observed Relay data. | -| `inferred` | A viewer inferred the field from local files, names, timestamps, or heuristics. | -| `unavailable` | Relay knows the field exists conceptually but did not capture it. | -| `unsupported` | The current provider, hook, or integration cannot expose the field. | - -Examples: - -- Token counts should be `observed` only when Relay receives real usage fields. -- Cost can be `derived` only from observed token counts plus a declared pricing - source. It should not claim invoice truth. -- Missing Codex hooks should be visible as `unsupported` or `unavailable` - instead of being papered over by UI guesses. -- Local file-touch summaries can be `inferred` if they come from local history - rather than Relay events. - -## Capture Quality Signals - -Relay should expose generic capture-quality signals when they help every -consumer: - -- Provider supported or unsupported. -- Hook present or missing. -- LLM span unavailable. -- Tool call input or output redacted. -- Token usage unavailable. -- Cost unavailable or derived from local policy. -- Subagent hierarchy unavailable. -- Event dropped due to backpressure. -- Exporter disconnected or receiver unavailable. - -These signals are more useful than blank panels because they tell the developer -whether the system is quiet, incomplete, or unsupported. - -## Local Viewer UX - -A good local cockpit should help developers answer questions quickly: - -- A left rail lists runs and sessions by human-readable title, provider, age, - state, and workspace. -- A center timeline shows prompt, scope, tool, LLM, file, and subagent events. -- A detail pane explains the selected event with exact field provenance. -- A usage panel shows per-session token and cost evidence, not only totals. -- A file-impact view groups touched files by path and highlights hot areas. -- A compare mode shows two runs side by side when debugging regressions. - -The viewer should avoid pretending to be complete. Empty panels should say why -the data is missing and which integration or hook would make it available. - -## Packaging Options - -There are three reasonable packaging paths: - -| Option | What ships in Relay | What ships outside Relay | -|---|---|---| -| Example viewer | Minimal receiver and browser example | Full product cockpit | -| Companion package | Viewer package maintained beside Relay APIs | Enterprise installers and branding | -| External app | Only exporter contract, fixtures, and docs | Entire cockpit implementation | - -The proposed default is example viewer first. It validates the Relay contract -without committing Relay maintainers to own a full UI product. If the example -becomes broadly useful, it can graduate to a companion package. - -## Redaction And Sharing - -Local-first does not remove the need for sanitization. Before traces are shared -outside the developer machine, the pipeline should support: - -- Provider payload redaction. -- File path and workspace redaction. -- Secret pattern redaction. -- Prompt and response redaction. -- Clear marking of redacted fields in the viewer. -- Export policies that separate local inspection from shareable artifacts. - -The viewer should display redaction status explicitly so reviewers know whether -they are looking at raw local evidence or a share-safe trace. - -## Non-Goals - -The first design should not: - -- Replace ATIF, OpenTelemetry, or OpenInference. -- Define a new tracing schema parallel to Relay events. -- Require Relay to own a large frontend application. -- Claim real billing cost without observed usage and an explicit pricing source. -- Infer missing provider data without provenance. - -## Milestones - -### M0: Alignment - -- Agree that the cockpit is a Relay event consumer. -- Agree that Relay owns generic event/exporter/schema pieces. -- Decide whether the first viewer artifact should be an example, companion - package, or separate application. - -### M1: Streaming ATOF - -- Provide a streaming ATOF exporter from the instrumented process to a local - receiver process. -- Document backpressure, disconnect, shutdown, and dropped-event behavior. -- Add socket-based tests that model a separate receiver process. - -### M2: Viewer Contract - -- Document the event fields and provenance states the viewer can rely on. -- Add a fixture trace that includes prompt, scope, tool, LLM, token, missing - field, and redaction examples. -- Provide a minimal receiver that proves live consumption. - -### M3: Local Cockpit Prototype - -- Render a session list, event timeline, selected-event inspector, and per-run - usage evidence. -- Label unavailable fields instead of filling them with guesses. -- Keep the prototype local-only and read-only by default. - -### M4: Product Decision - -- Decide whether the UI remains a separate project, becomes a Relay companion - package, or stays as a reference example. -- Promote only generic contracts and reusable utilities into Relay. - -## Open Questions - -1. Should Relay ship only a minimal example viewer, or should it own a companion - viewer package? -2. Should live viewers consume ATOF directly, or should Relay add a receiver - process that transforms ATOF into browser-friendly Server-Sent Events or - WebSocket messages? -3. Which token and cost fields should be first-class Relay fields versus - viewer-local derived values? -4. What is the preferred sanitizer API for share-safe local traces? -5. Which Codex, Claude Code, Cursor, or harness hooks should emit explicit - capture-quality signals when data is missing? -6. Should cockpit examples live in `examples/`, `integrations/`, or the docs - site? diff --git a/docs/observability-plugin/streaming-atof-design.mdx b/docs/observability-plugin/streaming-atof-design.mdx new file mode 100644 index 00000000..9f51162f --- /dev/null +++ b/docs/observability-plugin/streaming-atof-design.mdx @@ -0,0 +1,113 @@ +--- +title: "Streaming ATOF Design" +sidebar-title: "Streaming ATOF Design" +description: "" +position: 7 +--- +{/* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-License-Identifier: Apache-2.0 */} + +This design covers streaming Agent Trajectory Observability Format (ATOF) +events from an instrumented process to a separate receiver process. + +It is intentionally limited to the NeMo Relay substrate: capture, plugin +configuration, raw ATOF emission, delivery behavior, validation, and shutdown. +Viewer UI, dashboards, browser surfaces, and product-specific applications are +downstream consumers of this stream and are not part of this design. + +## Problem + +The filesystem ATOF exporter writes useful canonical JSONL, but a local +receiver cannot inspect events until it can read the file. Tooling that wants a +live view of agent scopes, marks, tool calls, LLM calls, and lifecycle events +needs a process-separated stream without embedding a UI server or viewer inside +the instrumented runtime. + +## Goals + +- Emit canonical ATOF JSON objects as JSONL over a TCP connection. +- Keep the receiver in a separate process from the instrumented agent runtime. +- Expose the stream through the Observability plugin config shape. +- Keep event production off the hot path with bounded buffering. +- Preserve the existing ATOF event contract and sanitizer behavior. +- Surface configuration and delivery failures explicitly. + +## Non-Goals + +- No browser UI, local dashboard, or product-specific viewer implementation. +- No HTTP, Server-Sent Events, WebSocket, or multi-client fan-out server. +- No replacement for ATIF, OpenTelemetry, or OpenInference exporters. +- No durable replay, backfill, or local database. +- No new PII redaction policy in this change. + +## Plugin Configuration + +The Observability plugin gets a dedicated `atof_stream` section: + +```toml +[components.config.atof_stream] +enabled = true +address = "127.0.0.1:43199" +``` + +The receiver must already be listening at `address` before plugin +initialization. Missing or blank `address` is a validation error when +`enabled = true`. + +## Architecture + +```mermaid +flowchart LR + App["Instrumented app or agent"] --> Events["NeMo Relay ATOF events"] + Events --> Plugin["Observability plugin"] + Plugin --> Exporter["AtofStreamingExporter"] + Exporter -->|TCP JSONL| Receiver["Separate receiver process"] + Receiver --> Consumer["Downstream consumer"] +``` + +The Observability plugin owns registration and teardown. The stream subscriber +is registered under: + +```text +__nemo_relay_plugin__observability__atof_stream +``` + +## Runtime Behavior + +`AtofStreamingExporter` connects to one TCP receiver and writes one JSON object +per line. It uses a bounded per-exporter queue so event producers do not block +on receiver I/O. If the receiver cannot keep up and the queue is full, the +newest event is dropped and `stats().events_dropped` records the overflow. + +Connection failures during initialization fail plugin registration. Write or +flush failures are recorded as exporter errors and surfaced during flush or +shutdown. Shutdown flushes queued events, closes the stream, and lets the +receiver observe EOF. + +## Failure Semantics + +| Condition | Behavior | +|---|---| +| `atof_stream.enabled = true` without `address` | Validation error. | +| Receiver is not listening | Plugin initialization fails. | +| Receiver disconnects mid-run | Exporter records the stream error; flush/shutdown surfaces it. | +| Bounded queue is full | Newest event is dropped; dropped count is available through exporter stats. | +| Plugin clear/shutdown | Subscriber is deregistered, queued events are flushed, stream closes. | + +## Validation Plan + +- Unit test plugin schema/editor support for `atof_stream`. +- Unit test validation for missing stream address. +- Unit test plugin initialization registers `atof_stream` under the expected + subscriber name. +- Unit test a separate TCP receiver gets `scope`, `mark`, and `scope` records + and observes clean shutdown. +- Binding helper tests cover Python, Go, and Node.js config helper shapes. + +## Open Questions + +- Should future stream receivers support authenticated local sockets or Unix + domain sockets? +- Should delivery stats become a generic exporter health surface? +- Should redaction hooks become configurable before an event reaches every + exporter? diff --git a/go/nemo_relay/observability_plugin.go b/go/nemo_relay/observability_plugin.go index 2b72b8d2..910638d0 100644 --- a/go/nemo_relay/observability_plugin.go +++ b/go/nemo_relay/observability_plugin.go @@ -8,12 +8,13 @@ const ObservabilityPluginKind = "observability" // ObservabilityConfig is the canonical Go shape for the observability plugin config document. type ObservabilityConfig struct { - Version uint32 `json:"version,omitempty"` - Atof *ObservabilityAtofConfig `json:"atof,omitempty"` - Atif *ObservabilityAtifConfig `json:"atif,omitempty"` - OpenTelemetry *ObservabilityOtlpConfig `json:"opentelemetry,omitempty"` - OpenInference *ObservabilityOtlpConfig `json:"openinference,omitempty"` - Policy *ConfigPolicy `json:"policy,omitempty"` + Version uint32 `json:"version,omitempty"` + Atof *ObservabilityAtofConfig `json:"atof,omitempty"` + AtofStream *ObservabilityAtofStreamConfig `json:"atof_stream,omitempty"` + Atif *ObservabilityAtifConfig `json:"atif,omitempty"` + OpenTelemetry *ObservabilityOtlpConfig `json:"opentelemetry,omitempty"` + OpenInference *ObservabilityOtlpConfig `json:"openinference,omitempty"` + Policy *ConfigPolicy `json:"policy,omitempty"` } // ObservabilityAtofConfig configures filesystem-backed raw ATOF JSONL export. @@ -24,6 +25,12 @@ type ObservabilityAtofConfig struct { Mode string `json:"mode,omitempty"` } +// ObservabilityAtofStreamConfig configures TCP-backed raw ATOF JSONL stream export. +type ObservabilityAtofStreamConfig struct { + Enabled bool `json:"enabled,omitempty"` + Address string `json:"address,omitempty"` +} + // ObservabilityAtifConfig configures per-top-level-agent ATIF file export. type ObservabilityAtifConfig struct { Enabled bool `json:"enabled,omitempty"` @@ -68,6 +75,11 @@ func NewObservabilityAtofConfig() ObservabilityAtofConfig { } } +// NewObservabilityAtofStreamConfig returns disabled ATOF stream settings. +func NewObservabilityAtofStreamConfig() ObservabilityAtofStreamConfig { + return ObservabilityAtofStreamConfig{} +} + // NewObservabilityAtifConfig returns disabled ATIF settings with core defaults. func NewObservabilityAtifConfig() ObservabilityAtifConfig { return ObservabilityAtifConfig{ diff --git a/go/nemo_relay/observability_plugin_test.go b/go/nemo_relay/observability_plugin_test.go index f5a7fbe4..9da83dbc 100644 --- a/go/nemo_relay/observability_plugin_test.go +++ b/go/nemo_relay/observability_plugin_test.go @@ -31,6 +31,10 @@ func TestObservabilityConfigHelpers(t *testing.T) { if atof.Enabled || atof.Mode != "append" { t.Fatalf("unexpected ATOF defaults: %#v", atof) } + atofStream := NewObservabilityAtofStreamConfig() + if atofStream.Enabled || atofStream.Address != "" { + t.Fatalf("unexpected ATOF stream defaults: %#v", atofStream) + } atif := NewObservabilityAtifConfig() if atif.Enabled || atif.AgentName != "NeMo Relay" || atif.ModelName != "unknown" || atif.FilenameTemplate != "nemo-relay-atif-{session_id}.json" { t.Fatalf("unexpected ATIF defaults: %#v", atif) @@ -41,6 +45,7 @@ func TestObservabilityConfigHelpers(t *testing.T) { } config.Atof = &atof + config.AtofStream = &atofStream wrapped := ObservabilityComponent(config) if wrapped.Kind != ObservabilityPluginKind || !wrapped.Enabled { t.Fatalf("unexpected component wrapper: %#v", wrapped) @@ -48,6 +53,9 @@ func TestObservabilityConfigHelpers(t *testing.T) { if _, ok := wrapped.Config["atof"].(map[string]any); !ok { t.Fatalf("expected serialized ATOF config object, got %#v", wrapped.Config) } + if _, ok := wrapped.Config["atof_stream"].(map[string]any); !ok { + t.Fatalf("expected serialized ATOF stream config object, got %#v", wrapped.Config) + } } func TestObservabilityPluginAtofAndAtifFiles(t *testing.T) { @@ -179,6 +187,9 @@ func TestObservabilityPluginValidationRejectsBadValues(t *testing.T) { atof := NewObservabilityAtofConfig() atof.Mode = "bad" config.Atof = &atof + atofStream := NewObservabilityAtofStreamConfig() + atofStream.Enabled = true + config.AtofStream = &atofStream atif := NewObservabilityAtifConfig() atif.FilenameTemplate = "missing-placeholder.json" config.Atif = &atif @@ -187,7 +198,7 @@ func TestObservabilityPluginValidationRejectsBadValues(t *testing.T) { if err != nil { t.Fatalf("ValidatePluginConfig failed: %v", err) } - if len(report.Diagnostics) < 2 { + if len(report.Diagnostics) < 3 { t.Fatalf("expected validation diagnostics, got %#v", report.Diagnostics) } } diff --git a/python/nemo_relay/observability.py b/python/nemo_relay/observability.py index 9917f1f2..8c88b761 100644 --- a/python/nemo_relay/observability.py +++ b/python/nemo_relay/observability.py @@ -73,6 +73,23 @@ def to_dict(self) -> JsonObject: ) +@dataclass(slots=True) +class AtofStreamConfig: + """TCP-backed raw ATOF JSONL stream export settings.""" + + enabled: bool = False + address: str | None = None + + def to_dict(self) -> JsonObject: + """Serialize this ATOF stream config to the canonical JSON object shape.""" + return _normalize_object( + { + "enabled": self.enabled, + "address": self.address, + } + ) + + @dataclass(slots=True) class S3StorageConfig: """S3-compatible remote storage settings for ATIF trajectory upload. @@ -181,6 +198,7 @@ class ObservabilityConfig: version: int = 1 atof: AtofConfig | None = None + atof_stream: AtofStreamConfig | None = None atif: AtifConfig | None = None opentelemetry: OtlpConfig | None = None openinference: OtlpConfig | None = None @@ -192,6 +210,7 @@ def to_dict(self) -> JsonObject: { "version": self.version, "atof": self.atof, + "atof_stream": self.atof_stream, "atif": self.atif, "opentelemetry": self.opentelemetry, "openinference": self.openinference, @@ -222,6 +241,7 @@ def to_dict(self) -> JsonObject: __all__ = [ "ConfigPolicy", "AtofConfig", + "AtofStreamConfig", "AtifConfig", "S3StorageConfig", "OtlpConfig", diff --git a/python/nemo_relay/observability.pyi b/python/nemo_relay/observability.pyi index 405cb39b..36abd928 100644 --- a/python/nemo_relay/observability.pyi +++ b/python/nemo_relay/observability.pyi @@ -25,6 +25,12 @@ class AtofConfig: mode: Literal["append", "overwrite"] = ... def to_dict(self) -> JsonObject: ... +@dataclass(slots=True) +class AtofStreamConfig: + enabled: bool = ... + address: str | None = ... + def to_dict(self) -> JsonObject: ... + @dataclass(slots=True) class S3StorageConfig: bucket: str = ... @@ -68,6 +74,7 @@ class OtlpConfig: class ObservabilityConfig: version: int = ... atof: AtofConfig | None = ... + atof_stream: AtofStreamConfig | None = ... atif: AtifConfig | None = ... opentelemetry: OtlpConfig | None = ... openinference: OtlpConfig | None = ... diff --git a/python/tests/test_observability_plugin.py b/python/tests/test_observability_plugin.py index dba92d04..87c38b38 100644 --- a/python/tests/test_observability_plugin.py +++ b/python/tests/test_observability_plugin.py @@ -15,6 +15,7 @@ OBSERVABILITY_PLUGIN_KIND, AtifConfig, AtofConfig, + AtofStreamConfig, ComponentSpec, ObservabilityConfig, OtlpConfig, @@ -28,6 +29,7 @@ class TestObservabilityConfigHelpers: def test_defaults_and_component_wrapper(self): assert AtofConfig().to_dict() == {"enabled": False, "mode": "append"} + assert AtofStreamConfig().to_dict() == {"enabled": False} assert AtifConfig().to_dict() == { "enabled": False, "agent_name": "NeMo Relay", @@ -58,6 +60,7 @@ def test_validation_rejects_bad_values(self): { "version": 1, "atof": {"mode": "bad"}, + "atof_stream": {"enabled": True}, "atif": {"filename_template": "missing-placeholder"}, } ) @@ -65,7 +68,7 @@ def test_validation_rejects_bad_values(self): ) ) fields = {diag.get("field") for diag in report["diagnostics"]} - assert {"mode", "filename_template"} <= fields + assert {"mode", "address", "filename_template"} <= fields def test_list_kinds_includes_builtin_observability(self): assert OBSERVABILITY_PLUGIN_KIND in plugin.list_kinds()