diff --git a/crates/cli/src/plugins/editor_model.rs b/crates/cli/src/plugins/editor_model.rs index 4c8fab0e..3bcbf1cd 100644 --- a/crates/cli/src/plugins/editor_model.rs +++ b/crates/cli/src/plugins/editor_model.rs @@ -33,9 +33,9 @@ pub(super) struct ComponentEditorState { #[derive(Debug)] pub(super) enum EditableComponent { - Observability(ComponentEditorState), - Adaptive(ComponentEditorState), - NemoGuardrails(ComponentEditorState), + Observability(Box>), + Adaptive(Box>), + NemoGuardrails(Box>), } impl EditableComponent { @@ -148,9 +148,9 @@ pub(super) fn editable_components( config: &PluginConfig, ) -> Result, CliError> { Ok(vec![ - EditableComponent::Observability(component_observability_state(config)?), - EditableComponent::Adaptive(component_adaptive_state(config)?), - EditableComponent::NemoGuardrails(component_nemo_guardrails_state(config)?), + EditableComponent::Observability(Box::new(component_observability_state(config)?)), + EditableComponent::Adaptive(Box::new(component_adaptive_state(config)?)), + EditableComponent::NemoGuardrails(Box::new(component_nemo_guardrails_state(config)?)), ]) } diff --git a/crates/core/src/observability/atof.rs b/crates/core/src/observability/atof.rs index d1bca1d2..17548293 100644 --- a/crates/core/src/observability/atof.rs +++ b/crates/core/src/observability/atof.rs @@ -10,8 +10,11 @@ use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; +use std::net::{Shutdown, TcpStream}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread::JoinHandle; +use std::time::Duration; use chrono::Utc; @@ -45,6 +48,28 @@ pub enum AtofExporterError { /// Underlying I/O error. source: std::io::Error, }, + /// Failed to connect to an ATOF stream receiver. + #[error("failed to connect to ATOF stream receiver {address}: {source}")] + ConnectStream { + /// Address that failed to connect. + address: String, + /// Underlying I/O error. + source: std::io::Error, + }, + /// Failed to configure the ATOF stream connection. + #[error( + "failed to configure ATOF stream receiver {address} with {operation} (ATOF_STREAM_WRITE_TIMEOUT={timeout:?}): {source}" + )] + ConfigureStream { + /// Address associated with the stream. + address: String, + /// Stream option that failed. + operation: &'static str, + /// Write timeout used when configuring the stream. + timeout: Option, + /// Underlying I/O error. + source: std::io::Error, + }, /// The exporter recorded an earlier write or serialization error. #[error("previous ATOF export failed for {path:?}: {message}")] StoredFailure { @@ -53,6 +78,14 @@ pub enum AtofExporterError { /// Stored failure message. message: String, }, + /// The streaming exporter recorded an earlier write or serialization error. + #[error("previous ATOF stream export failed for {address}: {message}")] + StoredStreamFailure { + /// Address associated with the stream. + address: String, + /// Stored failure message. + message: String, + }, /// The internal exporter state lock was poisoned. #[error("the ATOF exporter state lock was poisoned")] LockPoisoned, @@ -225,6 +258,307 @@ impl AtofExporter { } } +/// Configuration for [`AtofStreamingExporter`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AtofStreamingExporterConfig { + /// TCP address for a separate local process that receives ATOF JSONL events. + pub address: String, +} + +impl AtofStreamingExporterConfig { + /// Create a streaming exporter config for the given TCP address. + pub fn new(address: impl Into) -> Self { + Self { + address: address.into(), + } + } +} + +const ATOF_STREAM_QUEUE_BOUND: usize = 1024; +const ATOF_STREAM_WRITE_TIMEOUT: Duration = Duration::from_secs(2); + +enum AtofStreamMessage { + Event(String), + Flush(mpsc::Sender>), + Shutdown(mpsc::Sender>), +} + +struct AtofStreamingExporterState { + sender: Option>, + writer_thread: Option>, + events_sent: u64, + events_dropped: u64, + last_error: Arc>>, +} + +/// Snapshot of [`AtofStreamingExporter`] delivery state. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct AtofStreamingExporterStats { + /// Number of ATOF events observed by the streaming exporter. + pub events_sent: u64, + /// Number of ATOF events dropped because the bounded streaming queue was full. + pub events_dropped: u64, + /// Most recent serialization or exporter state error, if one was recorded. + pub last_error: Option, +} + +/// TCP-backed Agent Trajectory Observability Format (ATOF) event stream exporter. +/// +/// The exporter exposes a regular NeMo Relay event subscriber and writes each +/// canonical ATOF JSON value as one JSONL line to a separate local process over +/// a TCP connection. A local UI, CLI, or bridge process can own the receiving +/// socket and fan events out over HTTP, SSE, WebSocket, stdout, or another +/// transport without redefining the ATOF event contract. +#[derive(Clone)] +pub struct AtofStreamingExporter { + address: String, + state: Arc>, +} + +impl AtofStreamingExporter { + /// Connect to a separate local ATOF stream receiver. + pub fn new(config: AtofStreamingExporterConfig) -> Result { + let address = config.address; + let stream = + TcpStream::connect(&address).map_err(|source| AtofExporterError::ConnectStream { + address: address.clone(), + source, + })?; + stream + .set_nodelay(true) + .map_err(|source| AtofExporterError::ConfigureStream { + address: address.clone(), + operation: "set_nodelay", + timeout: None, + source, + })?; + stream + .set_write_timeout(Some(ATOF_STREAM_WRITE_TIMEOUT)) + .map_err(|source| AtofExporterError::ConfigureStream { + address: address.clone(), + operation: "set_write_timeout", + timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), + source, + })?; + let (sender, receiver) = mpsc::sync_channel(ATOF_STREAM_QUEUE_BOUND); + let last_error = Arc::new(Mutex::new(None)); + let writer_error = Arc::clone(&last_error); + let writer_thread = std::thread::spawn(move || { + let mut writer = BufWriter::new(stream); + while let Ok(message) = receiver.recv() { + match message { + AtofStreamMessage::Event(value) => { + if let Err(error) = write_serialized_event(&mut writer, &value) { + store_stream_error(&writer_error, error); + } + } + AtofStreamMessage::Flush(reply) => { + let result = writer.flush().map_err(|error| error.to_string()); + if let Err(error) = &result { + store_stream_error(&writer_error, error.clone()); + } + let _ = reply.send(result); + } + AtofStreamMessage::Shutdown(reply) => { + let result = writer.flush().map_err(|error| error.to_string()); + if let Err(error) = &result { + store_stream_error(&writer_error, error.clone()); + } + let _ = writer.get_ref().shutdown(Shutdown::Both); + let _ = reply.send(result); + break; + } + } + } + }); + Ok(Self { + address, + state: Arc::new(Mutex::new(AtofStreamingExporterState { + sender: Some(sender), + writer_thread: Some(writer_thread), + events_sent: 0, + events_dropped: 0, + last_error, + })), + }) + } + + /// Connect to a separate local ATOF stream receiver at the given TCP address. + pub fn connect(address: impl Into) -> Result { + Self::new(AtofStreamingExporterConfig::new(address)) + } + + /// Return the connected stream receiver address. + pub fn address(&self) -> &str { + &self.address + } + + /// Return an event subscriber that writes one canonical JSONL record per event. + pub fn subscriber(&self) -> EventSubscriberFn { + let state = Arc::clone(&self.state); + Arc::new(move |event: &Event| { + let value = match serialize_event(event) { + Ok(value) => value, + Err(error) => { + if let Ok(state) = state.lock() { + store_stream_error(&state.last_error, error); + } + return; + } + }; + let Ok(mut state) = state.lock() else { + return; + }; + if stream_last_error(&state.last_error).is_some() { + return; + } + let Some(sender) = state.sender.as_ref() else { + store_stream_error(&state.last_error, "stream receiver is closed".to_string()); + return; + }; + match sender.try_send(AtofStreamMessage::Event(value)) { + Ok(()) => { + state.events_sent += 1; + } + Err(mpsc::TrySendError::Full(_)) => { + state.events_dropped += 1; + } + Err(mpsc::TrySendError::Disconnected(_)) => { + store_stream_error( + &state.last_error, + "ATOF stream writer is disconnected".to_string(), + ); + } + } + }) + } + + /// Register this streaming exporter globally under the given subscriber name. + pub fn register(&self, name: &str) -> Result<()> { + register_subscriber(name, self.subscriber()).map_err(Into::into) + } + + /// Deregister a global subscriber by name. + pub fn deregister(&self, name: &str) -> Result { + deregister_subscriber(name).map_err(Into::into) + } + + /// Flush the stream and report any stored write error. + pub fn force_flush(&self) -> Result<()> { + let (sender, last_error) = { + let state = self + .state + .lock() + .map_err(|_| AtofExporterError::LockPoisoned)?; + if let Some(message) = stream_last_error(&state.last_error) { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + } + (state.sender.clone(), Arc::clone(&state.last_error)) + }; + let Some(sender) = sender else { + return Ok(()); + }; + let (reply_sender, reply_receiver) = mpsc::channel(); + if sender.send(AtofStreamMessage::Flush(reply_sender)).is_err() { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: "ATOF stream writer is disconnected".to_string(), + }); + } + match reply_receiver.recv() { + Ok(Ok(())) => { + if let Some(message) = stream_last_error(&last_error) { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + } + Ok(()) + } + Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }), + Err(error) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: error.to_string(), + }), + } + } + + /// Shut down the stream by flushing and closing the TCP connection. + pub fn shutdown(&self) -> Result<()> { + let flush_result = self.force_flush(); + let (sender, writer_thread, last_error) = { + let mut state = self + .state + .lock() + .map_err(|_| AtofExporterError::LockPoisoned)?; + ( + state.sender.take(), + state.writer_thread.take(), + Arc::clone(&state.last_error), + ) + }; + let shutdown_result = if let Some(sender) = sender { + let (reply_sender, reply_receiver) = mpsc::channel(); + let send_result = sender + .send(AtofStreamMessage::Shutdown(reply_sender)) + .map_err(|_| AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: "ATOF stream writer is disconnected".to_string(), + }); + match send_result { + Ok(()) => match reply_receiver.recv() { + Ok(Ok(())) => Ok(()), + Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }), + Err(error) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: error.to_string(), + }), + }, + Err(error) => Err(error), + } + } else { + Ok(()) + }; + if let Some(writer_thread) = writer_thread { + let _ = writer_thread.join(); + } + let stored_result = + stream_last_error(&last_error).map(|message| AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + match (flush_result, shutdown_result) { + (Err(error), _) => Err(error), + (Ok(()), Err(error)) => Err(error), + (Ok(()), Ok(())) => stored_result.map_or(Ok(()), Err), + } + } + + /// Return a point-in-time delivery snapshot for diagnostics and tests. + pub fn stats(&self) -> AtofStreamingExporterStats { + let Ok(state) = self.state.lock() else { + return AtofStreamingExporterStats { + last_error: Some("the ATOF streaming exporter state lock was poisoned".to_string()), + ..AtofStreamingExporterStats::default() + }; + }; + AtofStreamingExporterStats { + events_sent: state.events_sent, + events_dropped: state.events_dropped, + last_error: stream_last_error(&state.last_error), + } + } +} + fn default_filename() -> String { format!( "nemo-relay-events-{}.jsonl", @@ -251,15 +585,35 @@ fn open_file(path: &Path, mode: AtofExporterMode) -> Result { }) } -fn write_event(writer: &mut BufWriter, event: &Event) -> std::result::Result<(), String> { +fn write_event(writer: &mut impl Write, event: &Event) -> std::result::Result<(), String> { + write_serialized_event(writer, &serialize_event(event)?) +} + +fn serialize_event(event: &Event) -> std::result::Result { let value = event .try_to_json_value() .map_err(|error| error.to_string())?; - serde_json::to_writer(&mut *writer, &value).map_err(|error| error.to_string())?; + serde_json::to_string(&value).map_err(|error| error.to_string()) +} + +fn write_serialized_event(writer: &mut impl Write, value: &str) -> std::result::Result<(), String> { + writer + .write_all(value.as_bytes()) + .map_err(|error| error.to_string())?; writer.write_all(b"\n").map_err(|error| error.to_string())?; writer.flush().map_err(|error| error.to_string()) } +fn store_stream_error(last_error: &Arc>>, error: String) { + if let Ok(mut last_error) = last_error.lock() { + last_error.get_or_insert(error); + } +} + +fn stream_last_error(last_error: &Arc>>) -> Option { + last_error.lock().ok().and_then(|error| error.clone()) +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/core/src/observability/plugin_component.rs b/crates/core/src/observability/plugin_component.rs index 3da978a4..b443429d 100644 --- a/crates/core/src/observability/plugin_component.rs +++ b/crates/core/src/observability/plugin_component.rs @@ -35,6 +35,7 @@ use crate::api::subscriber::{scope_deregister_subscriber, scope_register_subscri use crate::observability::atif::{AtifAgentInfo, AtifExporter}; use crate::observability::atof::{ AtofExporter, AtofExporterConfig as CoreAtofExporterConfig, AtofExporterMode, + AtofStreamingExporter, AtofStreamingExporterConfig as CoreAtofStreamingExporterConfig, }; #[cfg(feature = "openinference")] use crate::observability::openinference::{ @@ -110,6 +111,9 @@ pub struct ObservabilityConfig { /// Filesystem-backed raw ATOF JSONL export. #[serde(default, skip_serializing_if = "Option::is_none")] pub atof: Option, + /// TCP-backed raw ATOF JSONL stream export to a separate receiver process. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub atof_stream: Option, /// Per-top-level-agent ATIF trajectory export. #[serde(default, skip_serializing_if = "Option::is_none")] pub atif: Option, @@ -129,6 +133,7 @@ impl Default for ObservabilityConfig { Self { version: default_observability_config_version(), atof: None, + atof_stream: None, atif: None, opentelemetry: None, openinference: None, @@ -172,6 +177,24 @@ impl Default for AtofSectionConfig { } } +/// TCP-backed raw ATOF JSONL stream exporter config. +/// +/// When enabled, this section wraps +/// [`crate::observability::atof::AtofStreamingExporter`] and connects to a +/// separate local receiver process. The receiver owns UI, HTTP, SSE, +/// WebSocket, or any other fan-out; Relay only emits canonical ATOF JSONL over +/// the configured TCP socket. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +pub struct AtofStreamSectionConfig { + /// Whether ATOF stream export is active. + #[serde(default)] + pub enabled: bool, + /// TCP address for the receiver process, for example `127.0.0.1:43199`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub address: Option, +} + /// Per-trajectory ATIF exporter config. /// /// When enabled, this section creates a dispatcher that opens a separate @@ -372,6 +395,13 @@ crate::editor_config! { nested: AtofSectionConfig, default: AtofSectionConfig, }, + atof_stream => { + label: "ATOF Stream", + kind: Section, + optional: true, + nested: AtofStreamSectionConfig, + default: AtofStreamSectionConfig, + }, atif => { label: "ATIF", kind: Section, @@ -411,6 +441,13 @@ crate::editor_config! { } } +crate::editor_config! { + impl AtofStreamSectionConfig { + enabled => { label: "enabled", kind: Boolean }, + address => { label: "address", kind: String, optional: true }, + } +} + crate::editor_config! { impl AtifSectionConfig { enabled => { label: "enabled", kind: Boolean }, @@ -538,6 +575,9 @@ fn register_observability( if let Some(atof) = config.atof.filter(|section| section.enabled) { register_atof_exporter(atof, ctx)?; } + if let Some(atof_stream) = config.atof_stream.filter(|section| section.enabled) { + register_atof_stream_exporter(atof_stream, ctx)?; + } if let Some(atif) = config.atif.filter(|section| section.enabled) { register_atif_dispatcher(atif, ctx)?; } @@ -579,6 +619,38 @@ fn register_atof_exporter( Ok(()) } +fn register_atof_stream_exporter( + section: AtofStreamSectionConfig, + ctx: &mut PluginRegistrationContext, +) -> PluginResult<()> { + let Some(address) = section + .address + .as_deref() + .map(str::trim) + .filter(|address| !address.is_empty()) + .map(ToOwned::to_owned) + else { + return Err(PluginError::InvalidConfig( + "ATOF stream address is required when atof_stream is enabled".to_string(), + )); + }; + let exporter = Arc::new( + AtofStreamingExporter::new(CoreAtofStreamingExporterConfig::new(address)) + .map_err(observability_registration_error)?, + ); + ctx.register_subscriber("atof_stream", exporter.subscriber())?; + ctx.add_registration(PluginRegistration::new( + "observability", + ctx.qualify_name("atof_stream.shutdown"), + Box::new(move || { + exporter + .shutdown() + .map_err(observability_registration_error) + }), + )); + Ok(()) +} + type AtifStorageList = Arc>>; fn register_atif_dispatcher( @@ -1287,6 +1359,7 @@ fn validate_observability_plugin_config( &[ "version", "atof", + "atof_stream", "atif", "opentelemetry", "openinference", @@ -1303,6 +1376,13 @@ fn validate_observability_plugin_config( "atof", &["enabled", "output_directory", "filename", "mode"], ); + validate_section_fields( + &mut diagnostics, + &config.policy, + plugin_config, + "atof_stream", + &["enabled", "address"], + ); validate_section_fields( &mut diagnostics, &config.policy, @@ -1371,6 +1451,20 @@ fn validate_observability_plugin_config( ); } } + if let Some(section) = &config.atof_stream { + validate_atof_stream_values(&mut diagnostics, &config.policy, section); + #[cfg(target_arch = "wasm32")] + if section.enabled { + push_policy_diag( + &mut diagnostics, + config.policy.unsupported_value, + "observability.unsupported_value", + Some("atof_stream".to_string()), + Some("enabled".to_string()), + "ATOF stream export is not supported on WebAssembly".to_string(), + ); + } + } if let Some(section) = &config.atif { validate_atif_values(&mut diagnostics, &config.policy, section); #[cfg(target_arch = "wasm32")] @@ -1492,6 +1586,29 @@ fn validate_atof_values( } } +fn validate_atof_stream_values( + diagnostics: &mut Vec, + policy: &ConfigPolicy, + section: &AtofStreamSectionConfig, +) { + if section.enabled + && section + .address + .as_deref() + .map(str::trim) + .is_none_or(str::is_empty) + { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atof_stream".to_string()), + Some("address".to_string()), + "atof_stream address is required when enabled".to_string(), + ); + } +} + fn validate_atif_values( diagnostics: &mut Vec, policy: &ConfigPolicy, diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 51fe6af2..0433b9d2 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -13,10 +13,17 @@ use crate::api::scope::{EmitMarkEventParams, PopScopeParams, PushScopeParams, Sc use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent}; use serde_json::{Map, json}; use std::fs; +use std::io::{BufRead, BufReader, ErrorKind, Read}; +use std::net::TcpListener; use std::sync::Arc; +use std::sync::mpsc; +use std::thread; +use std::time::{Duration, Instant}; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; +const TEST_RECV_TIMEOUT: Duration = Duration::from_secs(2); + fn temp_dir(prefix: &str) -> PathBuf { let id = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -111,6 +118,112 @@ fn read_jsonl(path: &Path) -> Vec { .collect() } +fn is_expected_stream_termination(error: &std::io::Error) -> bool { + matches!( + error.kind(), + ErrorKind::BrokenPipe + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset + | ErrorKind::NotConnected + | ErrorKind::UnexpectedEof + ) +} + +struct AtofSocketSink { + address: String, + events: mpsc::Receiver>, + saw_eof: mpsc::Receiver, +} + +fn start_atof_socket_sink(expected_events: usize) -> AtofSocketSink { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + let (eof_sender, eof_receiver) = mpsc::channel(); + thread::spawn(move || { + let deadline = Instant::now() + TEST_RECV_TIMEOUT; + let stream = loop { + match listener.accept() { + Ok((stream, _)) => break stream, + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + if Instant::now() >= deadline { + let _ = sender.send(Vec::new()); + let _ = eof_sender.send(false); + return; + } + thread::sleep(Duration::from_millis(10)); + } + Err(_) => { + let _ = sender.send(Vec::new()); + let _ = eof_sender.send(false); + return; + } + } + }; + stream.set_nonblocking(false).unwrap(); + stream.set_read_timeout(Some(TEST_RECV_TIMEOUT)).unwrap(); + let mut reader = BufReader::new(stream); + let mut events = Vec::with_capacity(expected_events); + for _ in 0..expected_events { + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) | Err(_) => break, + Ok(_) => events.push(serde_json::from_str(line.trim_end()).unwrap()), + } + } + let _ = sender.send(events); + let _ = reader.get_ref().set_read_timeout(None); + let mut drain = String::new(); + let saw_termination = match reader.read_to_string(&mut drain) { + Ok(_) => true, + Err(error) => is_expected_stream_termination(&error), + }; + let _ = eof_sender.send(saw_termination); + }); + AtofSocketSink { + address, + events: receiver, + saw_eof: eof_receiver, + } +} + +fn start_atof_eof_sink() -> (String, mpsc::Receiver) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let deadline = Instant::now() + TEST_RECV_TIMEOUT; + let stream = loop { + match listener.accept() { + Ok((stream, _)) => break stream, + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + if Instant::now() >= deadline { + let _ = sender.send(false); + return; + } + thread::sleep(Duration::from_millis(10)); + } + Err(_) => { + let _ = sender.send(false); + return; + } + } + }; + stream.set_nonblocking(false).unwrap(); + stream.set_read_timeout(Some(TEST_RECV_TIMEOUT)).unwrap(); + let mut reader = BufReader::new(stream); + let mut buffer = String::new(); + let saw_termination = match reader.read_to_string(&mut buffer) { + Ok(_) => true, + Err(error) => is_expected_stream_termination(&error), + }; + let _ = sender.send(saw_termination); + }); + (address, receiver) +} + #[test] fn default_config_uses_cwd_append_and_timestamped_filename() { let config = AtofExporterConfig::default(); @@ -218,6 +331,198 @@ fn subscriber_writes_canonical_event_jsonl() { ); } +#[test] +fn streaming_exporter_writes_canonical_event_json_values_to_socket() { + let sink = start_atof_socket_sink(1); + let exporter = AtofStreamingExporter::connect(sink.address.clone()).unwrap(); + let event = make_annotated_llm_event("streamed-llm-start"); + + (exporter.subscriber())(&event); + + let delivered = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + exporter.shutdown().unwrap(); + + assert_eq!(delivered[0], event.try_to_json_value().unwrap()); + assert_eq!(exporter.stats().events_sent, 1); + assert!(sink.saw_eof.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); +} + +#[test] +fn streaming_exporter_drops_when_queue_is_full_without_poisoning_stream() { + let (sender, receiver) = mpsc::sync_channel(1); + assert!( + sender + .try_send(AtofStreamMessage::Event("{\"queued\":true}".to_string())) + .is_ok() + ); + let exporter = AtofStreamingExporter { + address: "127.0.0.1:0".to_string(), + state: Arc::new(Mutex::new(AtofStreamingExporterState { + sender: Some(sender), + writer_thread: None, + events_sent: 0, + events_dropped: 0, + last_error: Arc::new(Mutex::new(None)), + })), + }; + + (exporter.subscriber())(&make_mark_event("queue-full")); + + let stats = exporter.stats(); + assert_eq!(stats.events_sent, 0); + assert_eq!(stats.events_dropped, 1); + assert_eq!(stats.last_error, None); + assert!(matches!( + receiver.try_recv().unwrap(), + AtofStreamMessage::Event(value) if value == "{\"queued\":true}" + )); +} + +#[test] +fn streaming_exporter_reports_connection_failure() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + drop(listener); + + let error = match AtofStreamingExporter::connect(address) { + Ok(_) => panic!("expected streaming exporter connection to fail"), + Err(error) => error, + }; + + assert!(matches!(error, AtofExporterError::ConnectStream { .. })); +} + +#[test] +fn streaming_exporter_configuration_error_names_socket_options() { + let error = AtofExporterError::ConfigureStream { + address: "127.0.0.1:65535".to_string(), + operation: "set_write_timeout", + timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), + source: std::io::Error::other("timeout option rejected"), + } + .to_string(); + + assert!(error.contains("set_write_timeout")); + assert!(error.contains("ATOF_STREAM_WRITE_TIMEOUT")); +} + +#[test] +fn streaming_exporter_shutdown_closes_stream_after_stored_error() { + let (address, receiver) = start_atof_eof_sink(); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + + let last_error = Arc::clone(&exporter.state.lock().unwrap().last_error); + *last_error.lock().unwrap() = Some("forced failure".to_string()); + let error = exporter.shutdown().unwrap_err(); + + assert!(matches!( + error, + AtofExporterError::StoredStreamFailure { .. } + )); + assert!(receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); +} + +#[test] +fn streaming_exporter_preserves_first_stored_error() { + let last_error = Arc::new(Mutex::new(None)); + + store_stream_error(&last_error, "first failure".to_string()); + store_stream_error(&last_error, "later failure".to_string()); + + assert_eq!( + stream_last_error(&last_error), + Some("first failure".to_string()) + ); +} + +#[test] +fn streaming_exporter_registers_with_runtime_events() { + let _guard = crate::observability::test_mutex().lock().unwrap(); + reset_global(); + + let pre_handle = crate::api::scope::push_scope( + PushScopeParams::builder() + .name("pre_atof_streaming_scope") + .scope_type(ScopeType::Agent) + .input(json!({"before": true})) + .build(), + ) + .unwrap(); + crate::api::scope::event( + EmitMarkEventParams::builder() + .name("pre_atof_streaming_mark") + .parent(&pre_handle) + .data(json!({"before": true})) + .build(), + ) + .unwrap(); + crate::api::scope::pop_scope( + PopScopeParams::builder() + .handle_uuid(&pre_handle.uuid) + .output(json!({"before": true})) + .build(), + ) + .unwrap(); + + let sink = start_atof_socket_sink(3); + let exporter = AtofStreamingExporter::connect(sink.address.clone()).unwrap(); + let name = format!("atof_streaming_exporter_{}", Uuid::now_v7()); + + exporter.register(&name).unwrap(); + let handle = crate::api::scope::push_scope( + PushScopeParams::builder() + .name("atof_streaming_scope") + .scope_type(ScopeType::Agent) + .input(json!({"scope": true})) + .build(), + ) + .unwrap(); + crate::api::scope::event( + EmitMarkEventParams::builder() + .name("atof_streaming_mark") + .parent(&handle) + .data(json!({"mark": true})) + .build(), + ) + .unwrap(); + crate::api::scope::pop_scope( + PopScopeParams::builder() + .handle_uuid(&handle.uuid) + .output(json!({"done": true})) + .build(), + ) + .unwrap(); + + assert!(exporter.deregister(&name).unwrap()); + assert!(!exporter.deregister(&name).unwrap()); + + let events = sink.events.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + exporter.shutdown().unwrap(); + + let scope_start = &events[0]; + let mark = &events[1]; + let scope_end = &events[2]; + + assert_eq!(scope_start["name"], "atof_streaming_scope"); + assert_eq!(scope_start["scope_category"], "start"); + assert_eq!(mark["name"], "atof_streaming_mark"); + assert_eq!(mark["kind"], "mark"); + assert_eq!(scope_end["name"], "atof_streaming_scope"); + assert_eq!(scope_end["scope_category"], "end"); + assert!( + events + .iter() + .all(|event| event["name"] != "pre_atof_streaming_scope") + ); + assert!( + events + .iter() + .all(|event| event["name"] != "pre_atof_streaming_mark") + ); + assert_eq!(exporter.stats().events_sent, 3); + assert!(sink.saw_eof.recv_timeout(TEST_RECV_TIMEOUT).unwrap()); +} + #[test] fn register_deregister_flush_and_shutdown_work_with_runtime_events() { let _guard = crate::observability::test_mutex().lock().unwrap(); diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index 540848f4..cd66bf97 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -17,7 +17,53 @@ use crate::plugin::{ }; use serde_json::json; use std::fs; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::io::{BufRead, BufReader, ErrorKind}; +use std::net::TcpListener; +use std::sync::mpsc; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +struct AtofStreamSink { + address: String, + receiver: mpsc::Receiver, +} + +fn start_atof_stream_sink() -> AtofStreamSink { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(2); + loop { + match listener.accept() { + Ok((stream, _)) => { + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let reader = BufReader::new(stream); + for line in reader.lines() { + match line { + Ok(line) if !line.trim().is_empty() => { + if let Ok(value) = serde_json::from_str(&line) { + let _ = sender.send(value); + } + } + Ok(_) => {} + Err(_) => break, + } + } + break; + } + Err(error) + if error.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => + { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + }); + AtofStreamSink { address, receiver } +} fn temp_dir(prefix: &str) -> PathBuf { let id = SystemTime::now() @@ -68,6 +114,18 @@ fn editor_schema_tracks_observability_config_types() { assert_eq!(mode.kind, EditorFieldKind::Enum); assert_eq!(mode.enum_values, &["append", "overwrite"]); + let stream = schema.field("atof_stream").expect("atof_stream section"); + assert_eq!(stream.label, "ATOF Stream"); + assert_eq!(stream.kind, EditorFieldKind::Section); + assert!(stream.optional); + + let stream_schema = stream.schema().expect("atof_stream editor schema"); + let address = stream_schema + .field("address") + .expect("stream address field"); + assert_eq!(address.kind, EditorFieldKind::String); + assert!(address.optional); + let otlp = schema .field("openinference") .expect("openinference section") @@ -161,6 +219,7 @@ fn default_config_and_component_conversion_cover_public_shape() { let defaults = ObservabilityConfig::default(); assert_eq!(defaults.version, 1); assert!(defaults.atof.is_none()); + assert!(defaults.atof_stream.is_none()); assert!(defaults.atif.is_none()); assert!(defaults.opentelemetry.is_none()); assert!(defaults.openinference.is_none()); @@ -171,6 +230,10 @@ fn default_config_and_component_conversion_cover_public_shape() { assert!(atof.output_directory.is_none()); assert!(atof.filename.is_none()); + let atof_stream = AtofStreamSectionConfig::default(); + assert!(!atof_stream.enabled); + assert!(atof_stream.address.is_none()); + let atif = AtifSectionConfig::default(); assert!(!atif.enabled); assert_eq!(atif.agent_name, "NeMo Relay"); @@ -186,6 +249,7 @@ fn default_config_and_component_conversion_cover_public_shape() { let generic: PluginComponentSpec = ComponentSpec::new(ObservabilityConfig { atof: Some(atof), + atof_stream: Some(atof_stream), atif: Some(atif), opentelemetry: Some(otlp.clone()), openinference: Some(otlp), @@ -195,6 +259,7 @@ fn default_config_and_component_conversion_cover_public_shape() { assert_eq!(generic.kind, OBSERVABILITY_PLUGIN_KIND); assert!(generic.enabled); assert_eq!(generic.config["version"], json!(1)); + assert_eq!(generic.config["atof_stream"]["enabled"], json!(false)); assert_eq!(generic.config["atif"]["agent_name"], json!("NeMo Relay")); } @@ -205,6 +270,7 @@ fn schema_contains_every_supported_observability_option() { for field in [ "version", "atof", + "atof_stream", "atif", "opentelemetry", "openinference", @@ -213,6 +279,7 @@ fn schema_contains_every_supported_observability_option() { "output_directory", "filename", "mode", + "address", "agent_name", "agent_version", "model_name", @@ -310,6 +377,7 @@ fn empty_and_disabled_config_register_nothing() { let config = plugin_config(json!({ "atof": {"enabled": false, "mode": "overwrite"}, + "atof_stream": {"enabled": false}, "atif": {"enabled": false}, "opentelemetry": {"enabled": false, "transport": "grpc"}, "openinference": {"enabled": false, "transport": "grpc"} @@ -377,6 +445,7 @@ fn unknown_fields_and_bad_values_follow_policy() { let warn_report = validate_plugin_config(&plugin_config(json!({ "atof": {"bogus": true, "mode": "invalid"}, + "atof_stream": {"enabled": true}, "atif": {"filename_template": "missing-session"} }))); assert!(warn_report.has_errors()); @@ -398,10 +467,18 @@ fn unknown_fields_and_bad_values_follow_policy() { .iter() .any(|diag| diag.field.as_deref() == Some("filename_template")) ); + assert!( + warn_report + .diagnostics + .iter() + .any(|diag| diag.component.as_deref() == Some("atof_stream") + && diag.field.as_deref() == Some("address")) + ); let ignore_report = validate_plugin_config(&plugin_config(json!({ "policy": {"unknown_field": "ignore", "unsupported_value": "ignore"}, "atof": {"bogus": true, "mode": "invalid"}, + "atof_stream": {"enabled": true}, "atif": {"filename_template": "missing-session"} }))); assert!(!ignore_report.has_errors()); @@ -520,6 +597,13 @@ fn initialization_fails_for_invalid_enabled_file_exporters() { let error = futures::executor::block_on(initialize_plugins(invalid_openinference_transport)) .unwrap_err(); assert!(error.to_string().contains("OpenInference transport")); + + let invalid_atof_stream = plugin_config(json!({ + "policy": {"unsupported_value": "ignore"}, + "atof_stream": {"enabled": true} + })); + let error = futures::executor::block_on(initialize_plugins(invalid_atof_stream)).unwrap_err(); + assert!(error.to_string().contains("ATOF stream address")); } #[test] @@ -570,6 +654,65 @@ fn atof_enabled_writes_jsonl_and_teardown_flushes() { assert!(lines[2].contains("\"scope_category\":\"end\"")); } +#[test] +fn atof_stream_enabled_writes_jsonl_to_receiver_and_teardown_flushes() { + let _guard = crate::observability::test_mutex().lock().unwrap(); + reset_runtime(); + let sink = start_atof_stream_sink(); + + let config = plugin_config(json!({ + "atof_stream": { + "enabled": true, + "address": sink.address + } + })); + assert!(!validate_plugin_config(&config).has_errors()); + futures::executor::block_on(initialize_plugins(config)).unwrap(); + + { + let state = global_context(); + let names = state + .read() + .unwrap() + .event_subscribers + .keys() + .cloned() + .collect::>(); + assert_eq!( + names, + vec!["__nemo_relay_plugin__observability__atof_stream"] + ); + } + + let agent = push_agent("atof-stream-agent"); + crate::api::scope::event( + crate::api::scope::EmitMarkEventParams::builder() + .name("checkpoint") + .parent(&agent) + .data(json!({"step": 1})) + .build(), + ) + .unwrap(); + pop(&agent); + clear_plugin_configuration().unwrap(); + + let records = (0..3) + .map(|_| { + sink.receiver + .recv_timeout(Duration::from_secs(2)) + .expect("streamed ATOF record") + }) + .collect::>(); + assert_eq!( + records + .iter() + .map(|record| record["kind"].as_str().unwrap()) + .collect::>(), + vec!["scope", "mark", "scope"] + ); + assert_eq!(records[1]["name"], json!("checkpoint")); +} + #[test] fn atif_defaults_create_one_file_per_top_level_agent() { let _guard = crate::observability::test_mutex().lock().unwrap(); diff --git a/crates/node/observability.d.ts b/crates/node/observability.d.ts index cae4555e..8ad0865c 100644 --- a/crates/node/observability.d.ts +++ b/crates/node/observability.d.ts @@ -13,6 +13,11 @@ export interface AtofConfig { mode?: 'append' | 'overwrite' | string; } +export interface AtofStreamConfig { + enabled?: boolean; + address?: string; +} + export interface S3StorageConfig { type: 's3'; bucket: string; @@ -53,6 +58,7 @@ export interface OtlpConfig { export interface Config { version?: number; atof?: AtofConfig; + atof_stream?: AtofStreamConfig; atif?: AtifConfig; opentelemetry?: OtlpConfig; openinference?: OtlpConfig; @@ -71,6 +77,8 @@ export declare const OBSERVABILITY_PLUGIN_KIND: 'observability'; export declare function defaultConfig(): Config; /** Create filesystem-backed Agent Trajectory Observability Format (ATOF) JSONL settings with defaults applied. */ export declare function atofConfig(config?: AtofConfig): AtofConfig; +/** Create TCP-backed Agent Trajectory Observability Format (ATOF) JSONL stream settings with defaults applied. */ +export declare function atofStreamConfig(config?: AtofStreamConfig): AtofStreamConfig; /** Create per-agent Agent Trajectory Interchange Format (ATIF) trajectory settings with defaults applied. */ export declare function atifConfig(config?: AtifConfig): AtifConfig; /** Create OTLP exporter settings for OpenTelemetry or OpenInference. */ diff --git a/crates/node/observability.js b/crates/node/observability.js index 7ce69c93..9152da46 100644 --- a/crates/node/observability.js +++ b/crates/node/observability.js @@ -32,6 +32,19 @@ function atofConfig(config = {}) { }; } +/** + * Create TCP-backed ATOF JSONL stream settings with defaults applied. + * + * @param {object} [config={}] - Partial ATOF stream settings to override. + * @returns {object} A normalized ATOF stream config object. + */ +function atofStreamConfig(config = {}) { + return { + enabled: false, + ...config, + }; +} + /** * Create per-agent ATIF trajectory settings with defaults applied. * @@ -83,6 +96,7 @@ module.exports = { OBSERVABILITY_PLUGIN_KIND, defaultConfig, atofConfig, + atofStreamConfig, atifConfig, otlpConfig, ComponentSpec, diff --git a/crates/node/tests/observability_plugin_tests.mjs b/crates/node/tests/observability_plugin_tests.mjs index c4e21fc1..de1bf7d0 100644 --- a/crates/node/tests/observability_plugin_tests.mjs +++ b/crates/node/tests/observability_plugin_tests.mjs @@ -21,6 +21,7 @@ describe('observability plugin helpers', () => { it('builds defaults and plugin component shape', () => { assert.deepEqual(observability.defaultConfig(), { version: 1 }); assert.deepEqual(observability.atofConfig(), { enabled: false, mode: 'append' }); + assert.deepEqual(observability.atofStreamConfig(), { enabled: false }); assert.deepEqual(observability.atifConfig(), { enabled: false, agent_name: 'NeMo Relay', @@ -49,11 +50,16 @@ describe('observability plugin helpers', () => { observability.ComponentSpec({ version: 1, atof: observability.atofConfig({ mode: 'bad' }), + atof_stream: observability.atofStreamConfig({ enabled: true }), atif: observability.atifConfig({ filename_template: 'missing-placeholder.json' }), }), ], }); - assert.deepEqual(report.diagnostics.map((diagnostic) => diagnostic.field).sort(), ['filename_template', 'mode']); + assert.deepEqual(report.diagnostics.map((diagnostic) => diagnostic.field).sort(), [ + 'address', + 'filename_template', + 'mode', + ]); }); it('activates ATOF and ATIF file sinks', async () => { diff --git a/docs/observability-plugin/about.mdx b/docs/observability-plugin/about.mdx index 5e3eeef8..776b52ae 100644 --- a/docs/observability-plugin/about.mdx +++ b/docs/observability-plugin/about.mdx @@ -21,6 +21,7 @@ Format (ATIF), OpenTelemetry, or OpenInference. The first-party plugin component has kind `observability`. It can install: - Agent Trajectory Observability Format (ATOF) JSONL export for raw lifecycle events. +- Streaming Agent Trajectory Observability Format (ATOF) JSONL export to a separate receiver process. - Agent Trajectory Interchange Format (ATIF) trajectory export for each top-level agent scope. - OpenTelemetry OTLP trace export. - OpenInference-oriented OTLP trace export. @@ -55,6 +56,7 @@ Choose the exporter based on the downstream system: | Need | Use | |---|---| | Raw canonical event stream | [Agent Trajectory Observability Format (ATOF)](/observability-plugin/atof) | +| Live process-separated raw event stream | [Streaming ATOF Design](/observability-plugin/streaming-atof-design) | | Offline analysis, replay, or evaluation trajectories | [Agent Trajectory Interchange Format (ATIF)](/observability-plugin/atif) | | Generic OTLP traces | [OpenTelemetry](/observability-plugin/opentelemetry) | | OpenInference-oriented agent and LLM spans | [OpenInference](/observability-plugin/openinference) | @@ -85,6 +87,8 @@ are not written into ATIF. - [Observability Configuration](/observability-plugin/configuration) documents the whole plugin component shape, activation, validation, and teardown. - [Agent Trajectory Observability Format (ATOF)](/observability-plugin/atof) covers raw JSONL event stream export. +- [Streaming ATOF Design](/observability-plugin/streaming-atof-design) covers TCP JSONL + streaming to a separate receiver process. - [Agent Trajectory Interchange Format (ATIF)](/observability-plugin/atif) covers per-agent trajectory export. - [OpenTelemetry](/observability-plugin/opentelemetry) covers generic OTLP trace export. - [OpenInference](/observability-plugin/openinference) covers OpenInference-oriented OTLP trace diff --git a/docs/observability-plugin/atof.mdx b/docs/observability-plugin/atof.mdx index df705473..63eccc2e 100644 --- a/docs/observability-plugin/atof.mdx +++ b/docs/observability-plugin/atof.mdx @@ -230,6 +230,62 @@ exporter.shutdown()?; +## Streaming API + +Use `AtofStreamingExporter` when an application needs to consume canonical ATOF +events as they are emitted instead of reading a completed JSONL file. The +streaming exporter connects to a separate local process over TCP and writes one +canonical ATOF JSON object per JSONL line. The receiver process can then fan the +events out through HTTP, SSE, WebSocket, stdout, or another local transport +without changing the ATOF event contract. `AtofStreamingExporter` uses a +per-exporter bounded queue: producers do not block, the newest event is dropped +when that bounded queue is full, and `stats().events_dropped` records the +overflow while fatal stream disconnects are still surfaced as errors. + +```rust +use nemo_relay::observability::atof::{ + AtofStreamingExporter, AtofStreamingExporterConfig, +}; + +// Start a separate local receiver process before connecting the exporter. +let config = AtofStreamingExporterConfig::new("127.0.0.1:43199"); +let exporter = AtofStreamingExporter::new(config)?; +exporter.register("atof-stream")?; + +// Run instrumented application work here. + +let _ = exporter.deregister("atof-stream")?; +exporter.shutdown()?; +let stats = exporter.stats(); +assert!(stats.events_sent > 0); +``` + +Register the streaming exporter before the instrumented work starts. The +receiver process only observes future events and does not replay earlier +lifecycle events. + +For plugin-managed streaming, use the Observability plugin `atof_stream` +section. The design details and failure semantics are documented in +[Streaming ATOF Design](/observability-plugin/streaming-atof-design). + +A minimal receiver can block on TCP lines and exit when the exporter closes the +connection. This keeps short idle gaps from ending the stream: + +```rust +use serde_json::{from_str, to_string, Value}; +use std::io::{BufRead, BufReader}; +use std::net::TcpListener; + +let listener = TcpListener::bind("127.0.0.1:43199")?; +let (stream, _) = listener.accept()?; +let reader = BufReader::new(stream); + +for line in reader.lines() { + let event: Value = from_str(&line?)?; + println!("{}", to_string(&event)?); +} +``` + ## Common Validation Failures - `mode` is not `append` or `overwrite`. diff --git a/docs/observability-plugin/configuration.mdx b/docs/observability-plugin/configuration.mdx index 3bedd30f..083aec6f 100644 --- a/docs/observability-plugin/configuration.mdx +++ b/docs/observability-plugin/configuration.mdx @@ -36,6 +36,7 @@ only when it includes `enabled: true`. | Section | Runtime behavior | |---|---| | `atof` | Registers a global Agent Trajectory Observability Format (ATOF) JSONL exporter for raw lifecycle events. | +| `atof_stream` | Registers a global TCP-backed ATOF JSONL stream exporter to a separate receiver process. | | `atif` | Registers one Agent Trajectory Interchange Format (ATIF) dispatcher that writes one trajectory file for each top-level agent scope. | | `opentelemetry` | Registers a global OpenTelemetry OTLP subscriber. | | `openinference` | Registers a global OpenInference OTLP subscriber. | @@ -45,6 +46,7 @@ component-local subscriber names and registers them under the observability plugin namespace: - Agent Trajectory Observability Format (ATOF): `__nemo_relay_plugin__observability__atof` +- Streaming Agent Trajectory Observability Format (ATOF): `__nemo_relay_plugin__observability__atof_stream` - Agent Trajectory Interchange Format (ATIF) dispatcher: `__nemo_relay_plugin__observability__atif` - Per-agent ATIF scope subscriber: `__nemo_relay_plugin__observability__atif-{agent_scope_uuid}` - OpenTelemetry: `__nemo_relay_plugin__observability__opentelemetry` @@ -68,6 +70,10 @@ output_directory = "logs" filename = "events.jsonl" mode = "overwrite" +[components.config.atof_stream] +enabled = true +address = "127.0.0.1:43199" + [components.config.atif] enabled = true output_directory = "logs" @@ -128,6 +134,7 @@ from nemo_relay import plugin, scope, ScopeType from nemo_relay.observability import ( AtifConfig, AtofConfig, + AtofStreamConfig, ComponentSpec, ObservabilityConfig, OtlpConfig, @@ -143,6 +150,10 @@ config = plugin.PluginConfig( filename="events.jsonl", mode="overwrite", ), + atof_stream=AtofStreamConfig( + enabled=True, + address="127.0.0.1:43199", + ), atif=AtifConfig( enabled=True, output_directory="logs", @@ -201,6 +212,10 @@ await plugin.initialize({ filename: "events.jsonl", mode: "overwrite", }), + atof_stream: observability.atofStreamConfig({ + enabled: true, + address: "127.0.0.1:43199", + }), atif: observability.atifConfig({ enabled: true, output_directory: "logs", @@ -244,8 +259,8 @@ try { ```rust use nemo_relay::observability::plugin_component::{ - AtifSectionConfig, AtofSectionConfig, ComponentSpec, ObservabilityConfig, - OtlpSectionConfig, + AtifSectionConfig, AtofSectionConfig, AtofStreamSectionConfig, + ComponentSpec, ObservabilityConfig, OtlpSectionConfig, }; use nemo_relay::plugin::{initialize_plugins, validate_plugin_config, PluginConfig}; @@ -256,6 +271,10 @@ let component = ComponentSpec::new(ObservabilityConfig { filename: Some("events.jsonl".into()), mode: "overwrite".into(), }), + atof_stream: Some(AtofStreamSectionConfig { + enabled: true, + address: Some("127.0.0.1:43199".into()), + }), atif: Some(AtifSectionConfig { enabled: true, output_directory: Some("logs".into()), diff --git a/docs/observability-plugin/streaming-atof-design.mdx b/docs/observability-plugin/streaming-atof-design.mdx new file mode 100644 index 00000000..9f51162f --- /dev/null +++ b/docs/observability-plugin/streaming-atof-design.mdx @@ -0,0 +1,113 @@ +--- +title: "Streaming ATOF Design" +sidebar-title: "Streaming ATOF Design" +description: "" +position: 7 +--- +{/* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-License-Identifier: Apache-2.0 */} + +This design covers streaming Agent Trajectory Observability Format (ATOF) +events from an instrumented process to a separate receiver process. + +It is intentionally limited to the NeMo Relay substrate: capture, plugin +configuration, raw ATOF emission, delivery behavior, validation, and shutdown. +Viewer UI, dashboards, browser surfaces, and product-specific applications are +downstream consumers of this stream and are not part of this design. + +## Problem + +The filesystem ATOF exporter writes useful canonical JSONL, but a local +receiver cannot inspect events until it can read the file. Tooling that wants a +live view of agent scopes, marks, tool calls, LLM calls, and lifecycle events +needs a process-separated stream without embedding a UI server or viewer inside +the instrumented runtime. + +## Goals + +- Emit canonical ATOF JSON objects as JSONL over a TCP connection. +- Keep the receiver in a separate process from the instrumented agent runtime. +- Expose the stream through the Observability plugin config shape. +- Keep event production off the hot path with bounded buffering. +- Preserve the existing ATOF event contract and sanitizer behavior. +- Surface configuration and delivery failures explicitly. + +## Non-Goals + +- No browser UI, local dashboard, or product-specific viewer implementation. +- No HTTP, Server-Sent Events, WebSocket, or multi-client fan-out server. +- No replacement for ATIF, OpenTelemetry, or OpenInference exporters. +- No durable replay, backfill, or local database. +- No new PII redaction policy in this change. + +## Plugin Configuration + +The Observability plugin gets a dedicated `atof_stream` section: + +```toml +[components.config.atof_stream] +enabled = true +address = "127.0.0.1:43199" +``` + +The receiver must already be listening at `address` before plugin +initialization. Missing or blank `address` is a validation error when +`enabled = true`. + +## Architecture + +```mermaid +flowchart LR + App["Instrumented app or agent"] --> Events["NeMo Relay ATOF events"] + Events --> Plugin["Observability plugin"] + Plugin --> Exporter["AtofStreamingExporter"] + Exporter -->|TCP JSONL| Receiver["Separate receiver process"] + Receiver --> Consumer["Downstream consumer"] +``` + +The Observability plugin owns registration and teardown. The stream subscriber +is registered under: + +```text +__nemo_relay_plugin__observability__atof_stream +``` + +## Runtime Behavior + +`AtofStreamingExporter` connects to one TCP receiver and writes one JSON object +per line. It uses a bounded per-exporter queue so event producers do not block +on receiver I/O. If the receiver cannot keep up and the queue is full, the +newest event is dropped and `stats().events_dropped` records the overflow. + +Connection failures during initialization fail plugin registration. Write or +flush failures are recorded as exporter errors and surfaced during flush or +shutdown. Shutdown flushes queued events, closes the stream, and lets the +receiver observe EOF. + +## Failure Semantics + +| Condition | Behavior | +|---|---| +| `atof_stream.enabled = true` without `address` | Validation error. | +| Receiver is not listening | Plugin initialization fails. | +| Receiver disconnects mid-run | Exporter records the stream error; flush/shutdown surfaces it. | +| Bounded queue is full | Newest event is dropped; dropped count is available through exporter stats. | +| Plugin clear/shutdown | Subscriber is deregistered, queued events are flushed, stream closes. | + +## Validation Plan + +- Unit test plugin schema/editor support for `atof_stream`. +- Unit test validation for missing stream address. +- Unit test plugin initialization registers `atof_stream` under the expected + subscriber name. +- Unit test a separate TCP receiver gets `scope`, `mark`, and `scope` records + and observes clean shutdown. +- Binding helper tests cover Python, Go, and Node.js config helper shapes. + +## Open Questions + +- Should future stream receivers support authenticated local sockets or Unix + domain sockets? +- Should delivery stats become a generic exporter health surface? +- Should redaction hooks become configurable before an event reaches every + exporter? diff --git a/go/nemo_relay/observability_plugin.go b/go/nemo_relay/observability_plugin.go index 2b72b8d2..910638d0 100644 --- a/go/nemo_relay/observability_plugin.go +++ b/go/nemo_relay/observability_plugin.go @@ -8,12 +8,13 @@ const ObservabilityPluginKind = "observability" // ObservabilityConfig is the canonical Go shape for the observability plugin config document. type ObservabilityConfig struct { - Version uint32 `json:"version,omitempty"` - Atof *ObservabilityAtofConfig `json:"atof,omitempty"` - Atif *ObservabilityAtifConfig `json:"atif,omitempty"` - OpenTelemetry *ObservabilityOtlpConfig `json:"opentelemetry,omitempty"` - OpenInference *ObservabilityOtlpConfig `json:"openinference,omitempty"` - Policy *ConfigPolicy `json:"policy,omitempty"` + Version uint32 `json:"version,omitempty"` + Atof *ObservabilityAtofConfig `json:"atof,omitempty"` + AtofStream *ObservabilityAtofStreamConfig `json:"atof_stream,omitempty"` + Atif *ObservabilityAtifConfig `json:"atif,omitempty"` + OpenTelemetry *ObservabilityOtlpConfig `json:"opentelemetry,omitempty"` + OpenInference *ObservabilityOtlpConfig `json:"openinference,omitempty"` + Policy *ConfigPolicy `json:"policy,omitempty"` } // ObservabilityAtofConfig configures filesystem-backed raw ATOF JSONL export. @@ -24,6 +25,12 @@ type ObservabilityAtofConfig struct { Mode string `json:"mode,omitempty"` } +// ObservabilityAtofStreamConfig configures TCP-backed raw ATOF JSONL stream export. +type ObservabilityAtofStreamConfig struct { + Enabled bool `json:"enabled,omitempty"` + Address string `json:"address,omitempty"` +} + // ObservabilityAtifConfig configures per-top-level-agent ATIF file export. type ObservabilityAtifConfig struct { Enabled bool `json:"enabled,omitempty"` @@ -68,6 +75,11 @@ func NewObservabilityAtofConfig() ObservabilityAtofConfig { } } +// NewObservabilityAtofStreamConfig returns disabled ATOF stream settings. +func NewObservabilityAtofStreamConfig() ObservabilityAtofStreamConfig { + return ObservabilityAtofStreamConfig{} +} + // NewObservabilityAtifConfig returns disabled ATIF settings with core defaults. func NewObservabilityAtifConfig() ObservabilityAtifConfig { return ObservabilityAtifConfig{ diff --git a/go/nemo_relay/observability_plugin_test.go b/go/nemo_relay/observability_plugin_test.go index f5a7fbe4..9da83dbc 100644 --- a/go/nemo_relay/observability_plugin_test.go +++ b/go/nemo_relay/observability_plugin_test.go @@ -31,6 +31,10 @@ func TestObservabilityConfigHelpers(t *testing.T) { if atof.Enabled || atof.Mode != "append" { t.Fatalf("unexpected ATOF defaults: %#v", atof) } + atofStream := NewObservabilityAtofStreamConfig() + if atofStream.Enabled || atofStream.Address != "" { + t.Fatalf("unexpected ATOF stream defaults: %#v", atofStream) + } atif := NewObservabilityAtifConfig() if atif.Enabled || atif.AgentName != "NeMo Relay" || atif.ModelName != "unknown" || atif.FilenameTemplate != "nemo-relay-atif-{session_id}.json" { t.Fatalf("unexpected ATIF defaults: %#v", atif) @@ -41,6 +45,7 @@ func TestObservabilityConfigHelpers(t *testing.T) { } config.Atof = &atof + config.AtofStream = &atofStream wrapped := ObservabilityComponent(config) if wrapped.Kind != ObservabilityPluginKind || !wrapped.Enabled { t.Fatalf("unexpected component wrapper: %#v", wrapped) @@ -48,6 +53,9 @@ func TestObservabilityConfigHelpers(t *testing.T) { if _, ok := wrapped.Config["atof"].(map[string]any); !ok { t.Fatalf("expected serialized ATOF config object, got %#v", wrapped.Config) } + if _, ok := wrapped.Config["atof_stream"].(map[string]any); !ok { + t.Fatalf("expected serialized ATOF stream config object, got %#v", wrapped.Config) + } } func TestObservabilityPluginAtofAndAtifFiles(t *testing.T) { @@ -179,6 +187,9 @@ func TestObservabilityPluginValidationRejectsBadValues(t *testing.T) { atof := NewObservabilityAtofConfig() atof.Mode = "bad" config.Atof = &atof + atofStream := NewObservabilityAtofStreamConfig() + atofStream.Enabled = true + config.AtofStream = &atofStream atif := NewObservabilityAtifConfig() atif.FilenameTemplate = "missing-placeholder.json" config.Atif = &atif @@ -187,7 +198,7 @@ func TestObservabilityPluginValidationRejectsBadValues(t *testing.T) { if err != nil { t.Fatalf("ValidatePluginConfig failed: %v", err) } - if len(report.Diagnostics) < 2 { + if len(report.Diagnostics) < 3 { t.Fatalf("expected validation diagnostics, got %#v", report.Diagnostics) } } diff --git a/python/nemo_relay/observability.py b/python/nemo_relay/observability.py index 9917f1f2..8c88b761 100644 --- a/python/nemo_relay/observability.py +++ b/python/nemo_relay/observability.py @@ -73,6 +73,23 @@ def to_dict(self) -> JsonObject: ) +@dataclass(slots=True) +class AtofStreamConfig: + """TCP-backed raw ATOF JSONL stream export settings.""" + + enabled: bool = False + address: str | None = None + + def to_dict(self) -> JsonObject: + """Serialize this ATOF stream config to the canonical JSON object shape.""" + return _normalize_object( + { + "enabled": self.enabled, + "address": self.address, + } + ) + + @dataclass(slots=True) class S3StorageConfig: """S3-compatible remote storage settings for ATIF trajectory upload. @@ -181,6 +198,7 @@ class ObservabilityConfig: version: int = 1 atof: AtofConfig | None = None + atof_stream: AtofStreamConfig | None = None atif: AtifConfig | None = None opentelemetry: OtlpConfig | None = None openinference: OtlpConfig | None = None @@ -192,6 +210,7 @@ def to_dict(self) -> JsonObject: { "version": self.version, "atof": self.atof, + "atof_stream": self.atof_stream, "atif": self.atif, "opentelemetry": self.opentelemetry, "openinference": self.openinference, @@ -222,6 +241,7 @@ def to_dict(self) -> JsonObject: __all__ = [ "ConfigPolicy", "AtofConfig", + "AtofStreamConfig", "AtifConfig", "S3StorageConfig", "OtlpConfig", diff --git a/python/nemo_relay/observability.pyi b/python/nemo_relay/observability.pyi index 405cb39b..36abd928 100644 --- a/python/nemo_relay/observability.pyi +++ b/python/nemo_relay/observability.pyi @@ -25,6 +25,12 @@ class AtofConfig: mode: Literal["append", "overwrite"] = ... def to_dict(self) -> JsonObject: ... +@dataclass(slots=True) +class AtofStreamConfig: + enabled: bool = ... + address: str | None = ... + def to_dict(self) -> JsonObject: ... + @dataclass(slots=True) class S3StorageConfig: bucket: str = ... @@ -68,6 +74,7 @@ class OtlpConfig: class ObservabilityConfig: version: int = ... atof: AtofConfig | None = ... + atof_stream: AtofStreamConfig | None = ... atif: AtifConfig | None = ... opentelemetry: OtlpConfig | None = ... openinference: OtlpConfig | None = ... diff --git a/python/tests/test_observability_plugin.py b/python/tests/test_observability_plugin.py index dba92d04..87c38b38 100644 --- a/python/tests/test_observability_plugin.py +++ b/python/tests/test_observability_plugin.py @@ -15,6 +15,7 @@ OBSERVABILITY_PLUGIN_KIND, AtifConfig, AtofConfig, + AtofStreamConfig, ComponentSpec, ObservabilityConfig, OtlpConfig, @@ -28,6 +29,7 @@ class TestObservabilityConfigHelpers: def test_defaults_and_component_wrapper(self): assert AtofConfig().to_dict() == {"enabled": False, "mode": "append"} + assert AtofStreamConfig().to_dict() == {"enabled": False} assert AtifConfig().to_dict() == { "enabled": False, "agent_name": "NeMo Relay", @@ -58,6 +60,7 @@ def test_validation_rejects_bad_values(self): { "version": 1, "atof": {"mode": "bad"}, + "atof_stream": {"enabled": True}, "atif": {"filename_template": "missing-placeholder"}, } ) @@ -65,7 +68,7 @@ def test_validation_rejects_bad_values(self): ) ) fields = {diag.get("field") for diag in report["diagnostics"]} - assert {"mode", "filename_template"} <= fields + assert {"mode", "address", "filename_template"} <= fields def test_list_kinds_includes_builtin_observability(self): assert OBSERVABILITY_PLUGIN_KIND in plugin.list_kinds()