From 24c36d4ecf2cbf4237a61e92a133de2e967c7614 Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Fri, 12 Jun 2026 11:00:21 +0530 Subject: [PATCH 1/9] Feat[OBE-10264] - Create disk-buffer with some data to verify backwards compatibility with EventArray enq_tm change --- .../buffer/v2/legacy/buffer-data-0.dat | Bin 0 -> 288 bytes .../buffer/v2/legacy/buffer.db | Bin 0 -> 24 bytes .../buffer/v2/legacy/buffer.lock | 0 .../tests/legacy_disk_buffer_fixture_gen.rs | 81 ++++++++++++++++++ 4 files changed, 81 insertions(+) create mode 100644 lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer-data-0.dat create mode 100644 lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer.db create mode 100644 lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer.lock create mode 100644 lib/vector-core/tests/legacy_disk_buffer_fixture_gen.rs diff --git a/lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer-data-0.dat b/lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer-data-0.dat new file mode 100644 index 0000000000000000000000000000000000000000..ce9935714a8a249aaeed1f1815f486d8891044b2 GIT binary patch literal 288 zcmZQz0D}!&ZH!!wT>M$DWpOX literal 0 HcmV?d00001 diff --git a/lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer.db b/lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer.db new file mode 100644 index 0000000000000000000000000000000000000000..aa3a2c956dfe29e5213170561b1fef4e6fa83fed GIT binary patch literal 24 KcmZQ!zzzTacmM?e literal 0 HcmV?d00001 diff --git a/lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer.lock b/lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/buffer.lock new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lib/vector-core/tests/legacy_disk_buffer_fixture_gen.rs b/lib/vector-core/tests/legacy_disk_buffer_fixture_gen.rs new file mode 100644 index 0000000000..af4c6b5f84 --- /dev/null +++ b/lib/vector-core/tests/legacy_disk_buffer_fixture_gen.rs @@ -0,0 +1,81 @@ +//! Generator for the legacy disk-buffer fixture used by backwards-compatibility tests. +//! +//! Run on a commit that PRE-dates the `enq_tm` field on `proto::EventArray`. The resulting +//! buffer files become the fixture for tests that verify pre-tag-4 records still decode. +//! +//! Run with: +//! +//! ```text +//! cargo test -p vector-core --test legacy_disk_buffer_fixture_gen -- --ignored --nocapture +//! ``` +//! +//! Output: `lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/buffer/v2/legacy/`. + +use std::{num::NonZeroU64, path::PathBuf}; + +use tracing::Span; +use vector_buffers::{topology::builder::TopologyBuilder, BufferType, WhenFull}; +use vector_core::event::{EventArray, LogEvent, Metric, MetricKind, MetricValue}; + +const FIXTURE_SUBDIR: &str = "tests/data/fixtures/legacy_disk_buffer_v2"; +const BUFFER_ID: &str = "legacy"; +const MAX_SIZE: u64 = 268_435_488; + +#[tokio::test] +#[ignore = "fixture generator; run explicitly and check in the output"] +async fn generate_legacy_fixture() { + let fixture_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(FIXTURE_SUBDIR); + + if fixture_root.exists() { + std::fs::remove_dir_all(&fixture_root).expect("clean fixture dir"); + } + std::fs::create_dir_all(&fixture_root).expect("create fixture dir"); + + let mut builder = TopologyBuilder::::default(); + BufferType::DiskV2 { + when_full: WhenFull::Block, + max_size: NonZeroU64::new(MAX_SIZE).unwrap(), + } + .add_to_builder( + &mut builder, + Some(fixture_root.clone()), + BUFFER_ID.to_string(), + ) + .expect("add disk_v2 stage"); + + let (mut sender, _receiver) = builder + .build(BUFFER_ID.to_string(), Span::current()) + .await + .expect("build topology"); + + for ea in fixture_records() { + sender.send(ea, None).await.expect("send"); + } + sender.flush().await.expect("flush"); + drop(sender); + + eprintln!( + "legacy disk-buffer fixture written to {}", + fixture_root.display() + ); +} + +fn fixture_records() -> Vec { + let mut l1 = LogEvent::default(); + l1.insert("message", "hello"); + l1.insert("level", "info"); + let mut l2 = LogEvent::default(); + l2.insert("message", "world"); + l2.insert("level", "warn"); + + let counter = Metric::new( + "test_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 42.0 }, + ); + + vec![ + EventArray::Logs(vec![l1, l2]), + EventArray::Metrics(vec![counter]), + ] +} From b5c9f6f559b7788291fb1d996abbc1feabea5729 Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Fri, 12 Jun 2026 16:19:14 +0530 Subject: [PATCH 2/9] Feat[OBE-10264] - Create a trait to support enqueue-time aware EventArray in buffers --- lib/vector-buffers/src/lib.rs | 3 + lib/vector-buffers/src/timed.rs | 198 +++++++++++++++++++++++++++++ lib/vector-core/proto/event.proto | 1 + lib/vector-core/src/event/proto.rs | 5 +- lib/vector-core/src/event/ser.rs | 155 +++++++++++++++++++++- 5 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 lib/vector-buffers/src/timed.rs diff --git a/lib/vector-buffers/src/lib.rs b/lib/vector-buffers/src/lib.rs index 84e88b6591..020bac5cd6 100644 --- a/lib/vector-buffers/src/lib.rs +++ b/lib/vector-buffers/src/lib.rs @@ -29,8 +29,11 @@ mod internal_events; #[cfg(test)] pub mod test; +mod timed; pub mod topology; +pub use timed::{Clock, SystemClock, Timed, TimedEncodable}; + pub(crate) mod variants; use std::fmt::Debug; diff --git a/lib/vector-buffers/src/timed.rs b/lib/vector-buffers/src/timed.rs new file mode 100644 index 0000000000..1e5e67e688 --- /dev/null +++ b/lib/vector-buffers/src/timed.rs @@ -0,0 +1,198 @@ +//! Wrapper carrying an enqueue timestamp alongside an item moving through a buffer. + +use std::time::{Duration, SystemTime}; + +use bytes::{Buf, BufMut}; +use vector_common::{ + byte_size_of::ByteSizeOf, + finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable}, +}; + +use crate::{encoding::Encodable, EventCount}; + +/// Source of wall-clock time. Real implementations return `SystemTime::now()`; tests can supply +/// a controllable mock so that elapsed-time assertions are deterministic. +pub trait Clock: Send + Sync + 'static { + fn now(&self) -> SystemTime; +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct SystemClock; + +impl Clock for SystemClock { + fn now(&self) -> SystemTime { + SystemTime::now() + } +} + +/// A bufferable item paired with the wall-clock time at which it entered a buffer. +/// +/// `enq_tm == None` denotes "unknown enqueue time" — typically a record decoded from a buffer +/// written by a Vector binary that predates this field. Receivers treat that as zero delay. +#[derive(Debug, Clone)] +pub struct Timed { + pub inner: T, + pub enq_tm: Option, +} + +impl Timed { + pub fn stamped(inner: T, clock: &dyn Clock) -> Self { + Self { + inner, + enq_tm: Some(clock.now()), + } + } + + pub fn untimed(inner: T) -> Self { + Self { + inner, + enq_tm: None, + } + } + + /// Duration since `enq_tm` measured against `clock`. Returns `Duration::ZERO` when the + /// timestamp is missing or when the clock has moved backwards (e.g. NTP correction). + pub fn elapsed(&self, clock: &dyn Clock) -> Duration { + match self.enq_tm { + None => Duration::ZERO, + Some(t) => clock.now().duration_since(t).unwrap_or(Duration::ZERO), + } + } + + pub fn into_inner(self) -> T { + self.inner + } +} + +impl ByteSizeOf for Timed { + fn allocated_bytes(&self) -> usize { + self.inner.allocated_bytes() + } +} + +impl EventCount for Timed { + fn event_count(&self) -> usize { + self.inner.event_count() + } +} + +impl AddBatchNotifier for Timed { + fn add_batch_notifier(&mut self, notifier: BatchNotifier) { + self.inner.add_batch_notifier(notifier); + } +} + +impl Finalizable for Timed { + fn take_finalizers(&mut self) -> EventFinalizers { + self.inner.take_finalizers() + } +} + +pub trait TimedEncodable: Encodable { + fn encode_with_enq_tm( + self, + enq_tm: Option, + buffer: &mut B, + ) -> Result<(), Self::EncodeError>; + + fn decode_with_enq_tm( + metadata: Self::Metadata, + buffer: B, + ) -> Result<(Self, Option), Self::DecodeError>; +} + +impl Encodable for Timed { + type Metadata = T::Metadata; + type EncodeError = T::EncodeError; + type DecodeError = T::DecodeError; + + fn get_metadata() -> Self::Metadata { + T::get_metadata() + } + + fn can_decode(metadata: Self::Metadata) -> bool { + T::can_decode(metadata) + } + + fn encode(self, buffer: &mut B) -> Result<(), Self::EncodeError> { + self.inner.encode_with_enq_tm(self.enq_tm, buffer) + } + + fn decode( + metadata: Self::Metadata, + buffer: B, + ) -> Result { + T::decode_with_enq_tm(metadata, buffer).map(|(inner, enq_tm)| Timed { inner, enq_tm }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use super::*; + + /// Test clock with explicit set/advance for deterministic elapsed-time assertions. + pub(crate) struct MockClock(Mutex); + + impl MockClock { + pub fn new(t: SystemTime) -> Self { + Self(Mutex::new(t)) + } + + pub fn advance(&self, by: Duration) { + let mut g = self.0.lock().unwrap(); + *g += by; + } + + pub fn set(&self, t: SystemTime) { + *self.0.lock().unwrap() = t; + } + } + + impl Clock for MockClock { + fn now(&self) -> SystemTime { + *self.0.lock().unwrap() + } + } + + #[test] + fn stamped_records_clock_now() { + let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000); + let clock = MockClock::new(t0); + let timed = Timed::stamped(42_u64, &clock); + assert_eq!(timed.enq_tm, Some(t0)); + assert_eq!(timed.inner, 42); + } + + #[test] + fn untimed_has_no_timestamp() { + let timed = Timed::untimed(42_u64); + assert_eq!(timed.enq_tm, None); + } + + #[test] + fn elapsed_reads_delta_from_clock() { + let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000); + let clock = MockClock::new(t0); + let timed = Timed::stamped(0_u64, &clock); + clock.advance(Duration::from_millis(250)); + assert_eq!(timed.elapsed(&clock), Duration::from_millis(250)); + } + + #[test] + fn elapsed_is_zero_when_clock_moved_backwards() { + let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000); + let clock = MockClock::new(t0); + let timed = Timed::stamped(0_u64, &clock); + clock.set(t0 - Duration::from_secs(60)); + assert_eq!(timed.elapsed(&clock), Duration::ZERO); + } + + #[test] + fn elapsed_is_zero_when_enq_tm_is_none() { + let clock = MockClock::new(SystemTime::UNIX_EPOCH); + let timed: Timed = Timed::untimed(0); + assert_eq!(timed.elapsed(&clock), Duration::ZERO); + } +} diff --git a/lib/vector-core/proto/event.proto b/lib/vector-core/proto/event.proto index 303aa6c2fe..8f6ffb3ab9 100644 --- a/lib/vector-core/proto/event.proto +++ b/lib/vector-core/proto/event.proto @@ -9,6 +9,7 @@ message EventArray { MetricArray metrics = 2; TraceArray traces = 3; } + google.protobuf.Timestamp enq_tm = 4; } message LogArray { diff --git a/lib/vector-core/src/event/proto.rs b/lib/vector-core/src/event/proto.rs index f1c926b49e..cd5cea8ead 100644 --- a/lib/vector-core/src/event/proto.rs +++ b/lib/vector-core/src/event/proto.rs @@ -45,7 +45,10 @@ impl From for EventArray { array::EventArray::Metrics(array) => event_array::Events::from_metrics(array), array::EventArray::Traces(array) => event_array::Events::from_traces(array), }); - Self { events } + Self { + events, + enq_tm: None, + } } } diff --git a/lib/vector-core/src/event/ser.rs b/lib/vector-core/src/event/ser.rs index be41910466..71bb0916fa 100644 --- a/lib/vector-core/src/event/ser.rs +++ b/lib/vector-core/src/event/ser.rs @@ -1,8 +1,13 @@ +use std::time::{Duration, SystemTime}; + use bytes::{Buf, BufMut}; use enumflags2::{bitflags, BitFlags, FromBitsError}; use prost::Message; use snafu::Snafu; -use vector_buffers::encoding::{AsMetadata, Encodable}; +use vector_buffers::{ + encoding::{AsMetadata, Encodable}, + TimedEncodable, +}; use super::{proto, Event, EventArray}; @@ -117,3 +122,151 @@ impl Encodable for EventArray { } } } + +impl EventArray { + fn enq_tm_to_proto(t: SystemTime) -> prost_types::Timestamp { + let dur = t + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + prost_types::Timestamp { + seconds: dur.as_secs() as i64, + nanos: dur.subsec_nanos() as i32, + } + } + + fn enq_tm_from_proto(t: prost_types::Timestamp) -> Option { + if t.seconds < 0 || t.nanos < 0 { + return None; + } + SystemTime::UNIX_EPOCH.checked_add(Duration::new(t.seconds as u64, t.nanos as u32)) + } +} + +impl TimedEncodable for EventArray { + fn encode_with_enq_tm( + self, + enq_tm: Option, + buffer: &mut B, + ) -> Result<(), Self::EncodeError> + where + B: BufMut, + { + let mut p = proto::EventArray::from(self); + p.enq_tm = enq_tm.map(Self::enq_tm_to_proto); + p.encode(buffer).map_err(|_| EncodeError::BufferTooSmall) + } + + fn decode_with_enq_tm( + metadata: Self::Metadata, + buffer: B, + ) -> Result<(Self, Option), Self::DecodeError> + where + B: Buf + Clone, + { + if !metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode) { + return Err(DecodeError::UnsupportedEncodingMetadata); + } + match proto::EventArray::decode(buffer.clone()) { + Ok(mut p) => { + let enq_tm = p.enq_tm.take().and_then(Self::enq_tm_from_proto); + Ok((EventArray::from(p), enq_tm)) + } + Err(_) => { + let pe = proto::EventWrapper::decode(buffer) + .map_err(|_| DecodeError::InvalidProtobufPayload)?; + Ok((EventArray::from(Event::from(pe)), None)) + } + } + } +} + +#[cfg(test)] +mod tests { + use bytes::BytesMut; + use vector_buffers::Timed; + + use super::*; + use crate::event::{LogEvent, Metric, MetricKind, MetricValue}; + + fn sample_logs() -> EventArray { + let mut l = LogEvent::default(); + l.insert("message", "hello"); + EventArray::Logs(vec![l]) + } + + fn sample_metrics() -> EventArray { + let m = Metric::new( + "n", + MetricKind::Incremental, + MetricValue::Counter { value: 1.0 }, + ); + EventArray::Metrics(vec![m]) + } + + fn encode(item: E) -> BytesMut { + let mut buf = BytesMut::new(); + item.encode(&mut buf).expect("encode"); + buf + } + + fn metadata() -> EventEncodableMetadata { + EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode.into() + } + + #[test] + fn timed_roundtrip_with_timestamp() { + let t = SystemTime::UNIX_EPOCH + + Duration::from_secs(1_700_000_000) + + Duration::from_nanos(123_456_789); + let original = Timed { + inner: sample_logs(), + enq_tm: Some(t), + }; + + let bytes = encode(original.clone()); + let decoded = + Timed::::decode(metadata(), bytes.freeze()).expect("decode"); + + assert_eq!(decoded.enq_tm, Some(t)); + assert_eq!(decoded.inner, original.inner); + } + + #[test] + fn timed_roundtrip_without_timestamp() { + let original = Timed { + inner: sample_metrics(), + enq_tm: None, + }; + + let bytes = encode(original.clone()); + let decoded = + Timed::::decode(metadata(), bytes.freeze()).expect("decode"); + + assert_eq!(decoded.enq_tm, None); + assert_eq!(decoded.inner, original.inner); + } + + #[test] + fn old_reader_ignores_new_timestamp_field() { + let t = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000); + let timed = Timed { + inner: sample_logs(), + enq_tm: Some(t), + }; + let bytes = encode(timed.clone()); + + let decoded = EventArray::decode(metadata(), bytes.freeze()).expect("decode"); + assert_eq!(decoded, timed.inner); + } + + #[test] + fn timed_decodes_legacy_bytes_with_none() { + let ea = sample_logs(); + let bytes = encode(ea.clone()); + + let decoded = + Timed::::decode(metadata(), bytes.freeze()).expect("decode"); + assert_eq!(decoded.enq_tm, None); + assert_eq!(decoded.inner, ea); + } +} From cb3e0967b14be04e84112017f2c2940a3285ea4e Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Fri, 12 Jun 2026 19:23:03 +0530 Subject: [PATCH 3/9] Feat[OBE-10264] - Wire TimedBufferable into topology buffers usage --- lib/vector-buffers/src/config.rs | 13 +- lib/vector-buffers/src/internal_events.rs | 16 +++ lib/vector-buffers/src/lib.rs | 2 +- lib/vector-buffers/src/test/variant.rs | 4 +- lib/vector-buffers/src/timed.rs | 25 +++- lib/vector-buffers/src/topology/builder.rs | 81 ++++++++--- .../src/topology/channel/receiver.rs | 129 ++++++++++++------ .../src/topology/channel/sender.rs | 76 ++++++++--- .../src/topology/channel/tests.rs | 8 +- lib/vector-buffers/src/topology/test_util.rs | 8 +- .../src/variants/disk_v2/mod.rs | 4 +- lib/vector-buffers/src/variants/in_memory.rs | 4 +- lib/vector-core/src/fanout.rs | 1 + lib/vector-tap/src/controller.rs | 2 +- src/topology/builder.rs | 12 +- 15 files changed, 283 insertions(+), 102 deletions(-) diff --git a/lib/vector-buffers/src/config.rs b/lib/vector-buffers/src/config.rs index 14826b6c20..07d3188795 100644 --- a/lib/vector-buffers/src/config.rs +++ b/lib/vector-buffers/src/config.rs @@ -17,7 +17,7 @@ use crate::{ channel::{BufferReceiver, BufferSender}, }, variants::{DiskV2Buffer, MemoryBuffer}, - Bufferable, WhenFull, + WhenFull, }; #[derive(Debug, Snafu)] @@ -274,7 +274,7 @@ impl BufferType { id: String, ) -> Result<(), BufferBuildError> where - T: Bufferable + Clone + Finalizable, + T: crate::TimedBufferable + Finalizable, { match *self { BufferType::Memory { @@ -366,20 +366,21 @@ impl BufferConfig { pub async fn build( &self, data_dir: Option, - buffer_id: String, + id: impl Into, span: Span, ) -> Result<(BufferSender, BufferReceiver), BufferBuildError> where - T: Bufferable + Clone + Finalizable, + T: crate::TimedBufferable + Finalizable, { + let component_key = id.into(); let mut builder = TopologyBuilder::default(); for stage in self.stages() { - stage.add_to_builder(&mut builder, data_dir.clone(), buffer_id.clone())?; + stage.add_to_builder(&mut builder, data_dir.clone(), component_key.id().to_string())?; } builder - .build(buffer_id, span) + .build(component_key, span) .await .context(FailedToBuildTopologySnafu) } diff --git a/lib/vector-buffers/src/internal_events.rs b/lib/vector-buffers/src/internal_events.rs index 0cda5beec4..547f6bb19f 100644 --- a/lib/vector-buffers/src/internal_events.rs +++ b/lib/vector-buffers/src/internal_events.rs @@ -137,3 +137,19 @@ registered_event! { self.send_duration.record(duration); } } + +registered_event! { + BufferQueueDelay { + component_id: String, + stage: usize, + } => { + queue_delay: Histogram = histogram!( + "topology_queue_delay_seconds", + "component_id" => self.component_id.clone(), + "stage" => self.stage.to_string()), + } + + fn emit(&self, duration: Duration) { + self.queue_delay.record(duration); + } +} diff --git a/lib/vector-buffers/src/lib.rs b/lib/vector-buffers/src/lib.rs index 020bac5cd6..57baf399af 100644 --- a/lib/vector-buffers/src/lib.rs +++ b/lib/vector-buffers/src/lib.rs @@ -32,7 +32,7 @@ pub mod test; mod timed; pub mod topology; -pub use timed::{Clock, SystemClock, Timed, TimedEncodable}; +pub use timed::{Clock, SystemClock, Timed, TimedBufferable, TimedEncodable}; pub(crate) mod variants; diff --git a/lib/vector-buffers/src/test/variant.rs b/lib/vector-buffers/src/test/variant.rs index a14995d86b..a935cca3fc 100644 --- a/lib/vector-buffers/src/test/variant.rs +++ b/lib/vector-buffers/src/test/variant.rs @@ -14,7 +14,7 @@ use crate::{ channel::{BufferReceiver, BufferSender}, }, variants::{DiskV2Buffer, MemoryBuffer}, - Bufferable, WhenFull, + TimedBufferable, WhenFull, }; #[cfg(test)] @@ -45,7 +45,7 @@ pub enum Variant { impl Variant { pub async fn create_sender_receiver(&self) -> (BufferSender, BufferReceiver) where - T: Bufferable + Clone + Finalizable, + T: TimedBufferable + Finalizable, { let mut builder = TopologyBuilder::default(); match self { diff --git a/lib/vector-buffers/src/timed.rs b/lib/vector-buffers/src/timed.rs index 1e5e67e688..f05dce0add 100644 --- a/lib/vector-buffers/src/timed.rs +++ b/lib/vector-buffers/src/timed.rs @@ -8,7 +8,10 @@ use vector_common::{ finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable}, }; -use crate::{encoding::Encodable, EventCount}; +use crate::{ + encoding::{Encodable, FixedEncodable}, + EventCount, +}; /// Source of wall-clock time. Real implementations return `SystemTime::now()`; tests can supply /// a controllable mock so that elapsed-time assertions are deterministic. @@ -101,6 +104,26 @@ pub trait TimedEncodable: Encodable { ) -> Result<(Self, Option), Self::DecodeError>; } +impl TimedEncodable for T { + fn encode_with_enq_tm( + self, + _enq_tm: Option, + buffer: &mut B, + ) -> Result<(), Self::EncodeError> { + ::encode(self, buffer) + } + + fn decode_with_enq_tm( + metadata: Self::Metadata, + buffer: B, + ) -> Result<(Self, Option), Self::DecodeError> { + ::decode(metadata, buffer).map(|v| (v, None)) + } +} + +pub trait TimedBufferable: TimedEncodable + crate::InMemoryBufferable + Clone {} +impl TimedBufferable for T where T: TimedEncodable + crate::InMemoryBufferable + Clone {} + impl Encodable for Timed { type Metadata = T::Metadata; type EncodeError = T::EncodeError; diff --git a/lib/vector-buffers/src/topology/builder.rs b/lib/vector-buffers/src/topology/builder.rs index 51f52900ec..ff4fd3c57d 100644 --- a/lib/vector-buffers/src/topology/builder.rs +++ b/lib/vector-buffers/src/topology/builder.rs @@ -1,20 +1,25 @@ -use std::{error::Error, num::NonZeroUsize}; +use std::{error::Error, num::NonZeroUsize, sync::Arc}; use async_trait::async_trait; use snafu::{ResultExt, Snafu}; use tracing::Span; +use vector_common::{config::ComponentKey, internal_event::register}; use super::channel::{ReceiverAdapter, SenderAdapter}; use crate::{ buffer_usage_data::{BufferUsage, BufferUsageHandle}, + internal_events::BufferQueueDelay, topology::channel::{BufferReceiver, BufferSender}, variants::MemoryBuffer, - Bufferable, WhenFull, + Clock, SystemClock, TimedBufferable, WhenFull, }; /// Value that can be used as a stage in a buffer topology. #[async_trait] -pub trait IntoBuffer: Send { +pub trait IntoBuffer: Send +where + T: TimedBufferable, +{ /// Gets whether or not this buffer stage provides its own instrumentation, or if it should be /// instrumented from the outside. /// @@ -57,17 +62,26 @@ pub enum TopologyError { StackedAcks, } -struct TopologyStage { +struct TopologyStage +where + T: TimedBufferable, +{ untransformed: Box>, when_full: WhenFull, } /// Builder for constructing buffer topologies. -pub struct TopologyBuilder { +pub struct TopologyBuilder +where + T: TimedBufferable, +{ stages: Vec>, } -impl TopologyBuilder { +impl TopologyBuilder +where + T: TimedBufferable, +{ /// Adds a new stage to the buffer topology. /// /// The "when full" behavior can be optionally configured here. If no behavior is specified, @@ -106,9 +120,12 @@ impl TopologyBuilder { /// explaining the issue. pub async fn build( self, - buffer_id: String, + id: impl Into, span: Span, ) -> Result<(BufferSender, BufferReceiver), TopologyError> { + let component_id = id.into().into_id(); + let clock: Arc = Arc::new(SystemClock); + // We pop stages off in reverse order to build from the inside out. let mut buffer_usage = BufferUsage::from_span(span.clone()); let mut current_stage = None; @@ -143,14 +160,24 @@ impl TopologyBuilder { .await .context(FailedToBuildStageSnafu { stage_idx })?; + let queue_delay = register(BufferQueueDelay { + component_id: component_id.clone(), + stage: stage_idx, + }); + let (mut sender, mut receiver) = match current_stage.take() { None => ( - BufferSender::new(sender, stage.when_full), - BufferReceiver::new(receiver), + BufferSender::new(sender, stage.when_full, Arc::clone(&clock)), + BufferReceiver::new(receiver, queue_delay, Arc::clone(&clock)), ), Some((current_sender, current_receiver)) => ( - BufferSender::with_overflow(sender, current_sender), - BufferReceiver::with_overflow(receiver, current_receiver), + BufferSender::with_overflow(sender, current_sender, Arc::clone(&clock)), + BufferReceiver::with_overflow( + receiver, + current_receiver, + queue_delay, + Arc::clone(&clock), + ), ), }; @@ -168,13 +195,16 @@ impl TopologyBuilder { // Install the buffer usage handler since we successfully created the buffer topology. This // spawns it in the background and periodically emits aggregated metrics about each of the // buffer stages. - buffer_usage.install(buffer_id.as_str()); + buffer_usage.install(component_id.as_str()); Ok((sender, receiver)) } } -impl TopologyBuilder { +impl TopologyBuilder +where + T: TimedBufferable, +{ /// Creates a memory-only buffer topology. /// /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest @@ -189,8 +219,11 @@ impl TopologyBuilder { pub async fn standalone_memory( max_events: NonZeroUsize, when_full: WhenFull, + id: impl Into, receiver_span: &Span, ) -> (BufferSender, BufferReceiver) { + let component_id = id.into().into_id(); + let clock: Arc = Arc::new(SystemClock); let usage_handle = BufferUsageHandle::noop(); let memory_buffer = Box::new(MemoryBuffer::new(max_events)); @@ -203,9 +236,13 @@ impl TopologyBuilder { WhenFull::Overflow => WhenFull::Block, m => m, }; - let mut sender = BufferSender::new(sender, mode); + let mut sender = BufferSender::new(sender, mode, Arc::clone(&clock)); sender.with_send_duration_instrumentation(0, receiver_span); - let receiver = BufferReceiver::new(receiver); + let queue_delay = register(BufferQueueDelay { + component_id, + stage: 0, + }); + let receiver = BufferReceiver::new(receiver, queue_delay, clock); (sender, receiver) } @@ -229,6 +266,7 @@ impl TopologyBuilder { when_full: WhenFull, usage_handle: BufferUsageHandle, ) -> (BufferSender, BufferReceiver) { + let clock: Arc = Arc::new(SystemClock); let memory_buffer = Box::new(MemoryBuffer::new(max_events)); let (sender, receiver) = memory_buffer .into_buffer_parts(usage_handle.clone()) @@ -239,8 +277,12 @@ impl TopologyBuilder { WhenFull::Overflow => WhenFull::Block, m => m, }; - let mut sender = BufferSender::new(sender, mode); - let mut receiver = BufferReceiver::new(receiver); + let queue_delay = register(BufferQueueDelay { + component_id: String::from(""), + stage: 0, + }); + let mut sender = BufferSender::new(sender, mode, Arc::clone(&clock)); + let mut receiver = BufferReceiver::new(receiver, queue_delay, clock); sender.with_usage_instrumentation(usage_handle.clone()); receiver.with_usage_instrumentation(usage_handle); @@ -249,7 +291,10 @@ impl TopologyBuilder { } } -impl Default for TopologyBuilder { +impl Default for TopologyBuilder +where + T: TimedBufferable, +{ fn default() -> Self { Self { stages: Vec::new() } } diff --git a/lib/vector-buffers/src/topology/channel/receiver.rs b/lib/vector-buffers/src/topology/channel/receiver.rs index d21aa1ed67..8df3884ea8 100644 --- a/lib/vector-buffers/src/topology/channel/receiver.rs +++ b/lib/vector-buffers/src/topology/channel/receiver.rs @@ -1,49 +1,64 @@ use std::{ mem, pin::Pin, + sync::Arc, task::{ready, Context, Poll}, }; use async_recursion::async_recursion; +use derivative::Derivative; use futures::Stream; use tokio::select; use tokio_util::sync::ReusableBoxFuture; -use vector_common::internal_event::emit; +use vector_common::{ + byte_size_of::ByteSizeOf, + internal_event::{emit, InternalEventHandle, Registered}, +}; use super::limited_queue::LimitedReceiver; use crate::{ buffer_usage_data::BufferUsageHandle, + internal_events::BufferQueueDelay, variants::disk_v2::{self, ProductionFilesystem}, - Bufferable, + Clock, EventCount, Timed, TimedBufferable, }; /// Adapter for papering over various receiver backends. #[derive(Debug)] -pub enum ReceiverAdapter { +pub enum ReceiverAdapter +where + T: TimedBufferable, +{ /// The in-memory channel buffer. - InMemory(LimitedReceiver), + InMemory(LimitedReceiver>), /// The disk v2 buffer. - DiskV2(disk_v2::BufferReader), + DiskV2(disk_v2::BufferReader, ProductionFilesystem>), } -impl From> for ReceiverAdapter { - fn from(v: LimitedReceiver) -> Self { +impl From>> for ReceiverAdapter +where + T: TimedBufferable, +{ + fn from(v: LimitedReceiver>) -> Self { Self::InMemory(v) } } -impl From> for ReceiverAdapter { - fn from(v: disk_v2::BufferReader) -> Self { +impl From, ProductionFilesystem>> for ReceiverAdapter +where + T: TimedBufferable, +{ + fn from(v: disk_v2::BufferReader, ProductionFilesystem>) -> Self { Self::DiskV2(v) } } impl ReceiverAdapter where - T: Bufferable, + T: TimedBufferable, { - pub(crate) async fn next(&mut self) -> Option { + pub(crate) async fn next(&mut self) -> Option> { match self { ReceiverAdapter::InMemory(rx) => rx.next().await, ReceiverAdapter::DiskV2(reader) => loop { @@ -72,29 +87,53 @@ where /// for querying the overflow buffer as well. The ordering of events when operating in "overflow" /// is undefined, as the receiver will try to manage polling both its own buffer, as well as the /// overflow buffer, in order to fairly balance throughput. -#[derive(Debug)] -pub struct BufferReceiver { +#[derive(Derivative)] +#[derivative(Debug)] +pub struct BufferReceiver +where + T: TimedBufferable, +{ base: ReceiverAdapter, overflow: Option>>, instrumentation: Option, + #[derivative(Debug = "ignore")] + queue_delay: Registered, + #[derivative(Debug = "ignore")] + clock: Arc, } -impl BufferReceiver { +impl BufferReceiver +where + T: TimedBufferable, +{ /// Creates a new [`BufferReceiver`] wrapping the given channel receiver. - pub fn new(base: ReceiverAdapter) -> Self { + pub fn new( + base: ReceiverAdapter, + queue_delay: Registered, + clock: Arc, + ) -> Self { Self { base, overflow: None, instrumentation: None, + queue_delay, + clock, } } /// Creates a new [`BufferReceiver`] wrapping the given channel receiver and overflow receiver. - pub fn with_overflow(base: ReceiverAdapter, overflow: BufferReceiver) -> Self { + pub fn with_overflow( + base: ReceiverAdapter, + overflow: BufferReceiver, + queue_delay: Registered, + clock: Arc, + ) -> Self { Self { base, overflow: Some(Box::new(overflow)), instrumentation: None, + queue_delay, + clock, } } @@ -124,32 +163,31 @@ impl BufferReceiver { // attached to the base receiver. let overflow = self.overflow.as_mut().map(Pin::new); - let (item, from_base) = match overflow { - None => match self.base.next().await { - Some(item) => (item, true), - None => return None, - }, + match overflow { + None => self.base.next().await.map(|timed| self.with_telemetry(timed)), Some(mut overflow) => { select! { - Some(item) = overflow.next() => (item, false), - Some(item) = self.base.next() => (item, true), - else => return None, + Some(timed) = self.base.next() => Some(self.with_telemetry(timed)), + Some(item) = overflow.next() => Some(item), + else => None, } } - }; + } + } + + fn with_telemetry(&self, item: Timed) -> T { + self.queue_delay.emit(item.elapsed(&*self.clock)); // If instrumentation is enabled, and we got the item from the base receiver, then and only // then do we track sending the event out. if let Some(handle) = self.instrumentation.as_ref() { - if from_base { - handle.increment_sent_event_count_and_byte_size( - item.event_count() as u64, - item.size_of() as u64, - ); - } + handle.increment_sent_event_count_and_byte_size( + item.event_count() as u64, + item.size_of() as u64, + ); } - Some(item) + item.into_inner() } pub fn into_stream(self) -> BufferReceiverStream { @@ -157,18 +195,27 @@ impl BufferReceiver { } } -enum StreamState { +enum StreamState +where + T: TimedBufferable, +{ Idle(BufferReceiver), Polling, Closed, } -pub struct BufferReceiverStream { +pub struct BufferReceiverStream +where + T: TimedBufferable, +{ state: StreamState, recv_fut: ReusableBoxFuture<'static, (Option, BufferReceiver)>, } -impl BufferReceiverStream { +impl BufferReceiverStream +where + T: TimedBufferable, +{ pub fn new(receiver: BufferReceiver) -> Self { Self { state: StreamState::Idle(receiver), @@ -177,7 +224,10 @@ impl BufferReceiverStream { } } -impl Stream for BufferReceiverStream { +impl Stream for BufferReceiverStream +where + T: TimedBufferable, +{ type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -205,9 +255,12 @@ impl Stream for BufferReceiverStream { } } -async fn make_recv_future( +async fn make_recv_future( receiver: Option>, -) -> (Option, BufferReceiver) { +) -> (Option, BufferReceiver) +where + T: TimedBufferable, +{ match receiver { None => panic!("invalid to poll future in uninitialized state"), Some(mut receiver) => { diff --git a/lib/vector-buffers/src/topology/channel/sender.rs b/lib/vector-buffers/src/topology/channel/sender.rs index 15191ec02e..bf3f2c4441 100644 --- a/lib/vector-buffers/src/topology/channel/sender.rs +++ b/lib/vector-buffers/src/topology/channel/sender.rs @@ -4,43 +4,55 @@ use async_recursion::async_recursion; use derivative::Derivative; use tokio::sync::Mutex; use tracing::Span; -use vector_common::internal_event::{register, InternalEventHandle, Registered}; +use vector_common::{ + byte_size_of::ByteSizeOf, + internal_event::{register, InternalEventHandle, Registered}, +}; use super::limited_queue::LimitedSender; use crate::{ buffer_usage_data::BufferUsageHandle, internal_events::BufferSendDuration, variants::disk_v2::{self, ProductionFilesystem}, - Bufferable, WhenFull, + Clock, EventCount, Timed, TimedBufferable, WhenFull, }; /// Adapter for papering over various sender backends. #[derive(Clone, Debug)] -pub enum SenderAdapter { +pub enum SenderAdapter +where + T: TimedBufferable, +{ /// The in-memory channel buffer. - InMemory(LimitedSender), + InMemory(LimitedSender>), /// The disk v2 buffer. - DiskV2(Arc>>), + DiskV2(Arc, ProductionFilesystem>>>), } -impl From> for SenderAdapter { - fn from(v: LimitedSender) -> Self { +impl From>> for SenderAdapter +where + T: TimedBufferable, +{ + fn from(v: LimitedSender>) -> Self { Self::InMemory(v) } } -impl From> for SenderAdapter { - fn from(v: disk_v2::BufferWriter) -> Self { +impl From, ProductionFilesystem>> for SenderAdapter +where + T: TimedBufferable, +{ + fn from(v: disk_v2::BufferWriter, ProductionFilesystem>) -> Self { Self::DiskV2(Arc::new(Mutex::new(v))) } } impl SenderAdapter where - T: Bufferable, + T: TimedBufferable, { - pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> { + pub(crate) async fn send(&mut self, item: Timed) -> crate::Result<()> { match self { Self::InMemory(tx) => tx.send(item).await.map_err(Into::into), Self::DiskV2(writer) => { @@ -60,7 +72,7 @@ where } } - pub(crate) async fn try_send(&mut self, item: T) -> crate::Result> { + pub(crate) async fn try_send(&mut self, item: Timed) -> crate::Result>> { match self { Self::InMemory(tx) => tx .try_send(item) @@ -130,35 +142,49 @@ where /// `#[async_recursion]` stuff. #[derive(Clone, Derivative)] #[derivative(Debug)] -pub struct BufferSender { +pub struct BufferSender +where + T: TimedBufferable, +{ base: SenderAdapter, overflow: Option>>, when_full: WhenFull, instrumentation: Option, #[derivative(Debug = "ignore")] send_duration: Option>, + #[derivative(Debug = "ignore")] + clock: Arc, } -impl BufferSender { +impl BufferSender +where + T: TimedBufferable, +{ /// Creates a new [`BufferSender`] wrapping the given channel sender. - pub fn new(base: SenderAdapter, when_full: WhenFull) -> Self { + pub fn new(base: SenderAdapter, when_full: WhenFull, clock: Arc) -> Self { Self { base, overflow: None, when_full, instrumentation: None, send_duration: None, + clock, } } /// Creates a new [`BufferSender`] wrapping the given channel sender and overflow sender. - pub fn with_overflow(base: SenderAdapter, overflow: BufferSender) -> Self { + pub fn with_overflow( + base: SenderAdapter, + overflow: BufferSender, + clock: Arc, + ) -> Self { Self { base, overflow: Some(Box::new(overflow)), when_full: WhenFull::Overflow, instrumentation: None, send_duration: None, + clock, } } @@ -184,7 +210,10 @@ impl BufferSender { } } -impl BufferSender { +impl BufferSender +where + T: TimedBufferable, +{ #[cfg(test)] pub(crate) fn get_base_ref(&self) -> &SenderAdapter { &self.base @@ -195,8 +224,17 @@ impl BufferSender { self.overflow.as_ref().map(AsRef::as_ref) } - #[async_recursion] pub async fn send(&mut self, item: T, send_reference: Option) -> crate::Result<()> { + let stamped = Timed::stamped(item, &*self.clock); + self.send_stamped(stamped, send_reference).await + } + + #[async_recursion] + async fn send_stamped( + &mut self, + item: Timed, + send_reference: Option, + ) -> crate::Result<()> { let item_sizing = self .instrumentation .as_ref() @@ -217,7 +255,7 @@ impl BufferSender { self.overflow .as_mut() .unwrap_or_else(|| unreachable!("overflow must exist")) - .send(item, send_reference) + .send_stamped(item, send_reference) .await?; } } diff --git a/lib/vector-buffers/src/topology/channel/tests.rs b/lib/vector-buffers/src/topology/channel/tests.rs index a1c68c8071..8a1915d77c 100644 --- a/lib/vector-buffers/src/topology/channel/tests.rs +++ b/lib/vector-buffers/src/topology/channel/tests.rs @@ -10,7 +10,7 @@ use crate::{ channel::{BufferReceiver, BufferSender}, test_util::{assert_current_send_capacity, build_buffer}, }, - Bufferable, WhenFull, + TimedBufferable, WhenFull, }; async fn assert_send_ok_with_capacities( @@ -19,7 +19,7 @@ async fn assert_send_ok_with_capacities( base_expected: Option, overflow_expected: Option, ) where - T: Bufferable, + T: TimedBufferable, { assert!(sender.send(value.into(), None).await.is_ok()); assert_current_send_capacity(sender, base_expected, overflow_expected); @@ -31,7 +31,7 @@ async fn blocking_send_and_drain_receiver( send_value: V, ) -> Vec where - T: Bufferable, + T: TimedBufferable, V: Into + From + Send + 'static, { // We can likely replace this with `tokio_test`-related helpers to avoid the sleeping. @@ -69,7 +69,7 @@ where async fn drain_receiver(sender: BufferSender, receiver: BufferReceiver) -> Vec where - T: Bufferable, + T: TimedBufferable, V: From + Send + 'static, { drop(sender); diff --git a/lib/vector-buffers/src/topology/test_util.rs b/lib/vector-buffers/src/topology/test_util.rs index ec0acac741..e36772ef1c 100644 --- a/lib/vector-buffers/src/topology/test_util.rs +++ b/lib/vector-buffers/src/topology/test_util.rs @@ -9,7 +9,7 @@ use crate::{ buffer_usage_data::BufferUsageHandle, encoding::FixedEncodable, topology::channel::{BufferReceiver, BufferSender}, - Bufferable, EventCount, WhenFull, + EventCount, TimedBufferable, WhenFull, }; #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] @@ -132,14 +132,14 @@ pub(crate) async fn build_buffer( } /// Gets the current capacity of the underlying base channel of the given sender. -fn get_base_sender_capacity(sender: &BufferSender) -> Option { +fn get_base_sender_capacity(sender: &BufferSender) -> Option { sender.get_base_ref().capacity() } /// Gets the current capacity of the underlying overflow channel of the given sender.. /// /// As overflow is optional, the return value will be `None` is overflow is not configured. -fn get_overflow_sender_capacity(sender: &BufferSender) -> Option { +fn get_overflow_sender_capacity(sender: &BufferSender) -> Option { sender .get_overflow_ref() .and_then(|s| s.get_base_ref().capacity()) @@ -154,7 +154,7 @@ pub fn assert_current_send_capacity( base_expected: Option, overflow_expected: Option, ) where - T: Bufferable, + T: TimedBufferable, { assert_eq!(get_base_sender_capacity(sender), base_expected); assert_eq!(get_overflow_sender_capacity(sender), overflow_expected); diff --git a/lib/vector-buffers/src/variants/disk_v2/mod.rs b/lib/vector-buffers/src/variants/disk_v2/mod.rs index e93738b178..53686fbbe2 100644 --- a/lib/vector-buffers/src/variants/disk_v2/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/mod.rs @@ -316,7 +316,7 @@ impl DiskV2Buffer { #[async_trait] impl IntoBuffer for DiskV2Buffer where - T: Bufferable + Clone + Finalizable, + T: crate::TimedBufferable + Finalizable, { fn provides_instrumentation(&self) -> bool { true @@ -326,7 +326,7 @@ where self: Box, usage_handle: BufferUsageHandle, ) -> Result<(SenderAdapter, ReceiverAdapter), Box> { - let (writer, reader) = build_disk_v2_buffer( + let (writer, reader) = build_disk_v2_buffer::>( usage_handle, &self.data_dir, self.id.as_str(), diff --git a/lib/vector-buffers/src/variants/in_memory.rs b/lib/vector-buffers/src/variants/in_memory.rs index fedcf35c2d..91864d10ec 100644 --- a/lib/vector-buffers/src/variants/in_memory.rs +++ b/lib/vector-buffers/src/variants/in_memory.rs @@ -8,7 +8,7 @@ use crate::{ builder::IntoBuffer, channel::{limited, ReceiverAdapter, SenderAdapter}, }, - Bufferable, + TimedBufferable, }; pub struct MemoryBuffer { @@ -24,7 +24,7 @@ impl MemoryBuffer { #[async_trait] impl IntoBuffer for MemoryBuffer where - T: Bufferable, + T: TimedBufferable, { async fn into_buffer_parts( self: Box, diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 889d8b6243..230fa95073 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -487,6 +487,7 @@ mod tests { TopologyBuilder::standalone_memory( NonZeroUsize::new(capacity).expect("capacity must be nonzero"), WhenFull::Block, + "fanout_test_buffer", &Span::current(), ) .await diff --git a/lib/vector-tap/src/controller.rs b/lib/vector-tap/src/controller.rs index 656f77c108..392af320e8 100644 --- a/lib/vector-tap/src/controller.rs +++ b/lib/vector-tap/src/controller.rs @@ -351,7 +351,7 @@ async fn tap_handler( // target for the component, and spawn our transformer task which will // wrap each event payload with the necessary metadata before forwarding // it to our global tap receiver. - let (tap_buffer_tx, mut tap_buffer_rx) = TopologyBuilder::standalone_memory(TAP_BUFFER_SIZE, WhenFull::DropNewest, &Span::current()).await; + let (tap_buffer_tx, mut tap_buffer_rx) = TopologyBuilder::standalone_memory(TAP_BUFFER_SIZE, WhenFull::DropNewest, "tap", &Span::current()).await; let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone()); tokio::spawn(async move { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 86460af82c..8db8a17b90 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -523,9 +523,13 @@ impl<'a> Builder<'a> { Ok(transform) => transform, }; - let (input_tx, input_rx) = - TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span) - .await; + let (input_tx, input_rx) = TopologyBuilder::standalone_memory( + TOPOLOGY_BUFFER_SIZE, + WhenFull::Block, + key.clone(), + &span, + ) + .await; self.inputs .insert(key.clone(), (input_tx, node.inputs.clone())); @@ -587,7 +591,7 @@ impl<'a> Builder<'a> { .buffer .build( self.config.global.data_dir.clone(), - key.to_string(), + key.clone(), buffer_span, ) .await; From 11c0545c6572c74d5cd64be8770bde6c4d1b021e Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Fri, 12 Jun 2026 21:29:42 +0530 Subject: [PATCH 4/9] Feat[OBE-10264] - Add basic tests for topology buffers and queue-delay reporting --- Cargo.lock | 1 + lib/vector-buffers/src/config.rs | 23 +- lib/vector-buffers/src/topology/builder.rs | 17 +- .../src/topology/channel/sender.rs | 17 ++ lib/vector-core/Cargo.toml | 2 + .../tests/buffer_disk_v2_compat.rs | 205 ++++++++++++++++++ .../tests/buffer_histogram_demo.rs | 167 ++++++++++++++ 7 files changed, 430 insertions(+), 2 deletions(-) create mode 100644 lib/vector-core/tests/buffer_disk_v2_compat.rs create mode 100644 lib/vector-core/tests/buffer_histogram_demo.rs diff --git a/Cargo.lock b/Cargo.lock index 008ccfe441..e65065fc62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12791,6 +12791,7 @@ dependencies = [ "snafu 0.7.5", "snap", "socket2 0.5.8", + "tempfile", "tokio", "tokio-openssl", "tokio-stream", diff --git a/lib/vector-buffers/src/config.rs b/lib/vector-buffers/src/config.rs index 07d3188795..f536c3eac8 100644 --- a/lib/vector-buffers/src/config.rs +++ b/lib/vector-buffers/src/config.rs @@ -369,6 +369,27 @@ impl BufferConfig { id: impl Into, span: Span, ) -> Result<(BufferSender, BufferReceiver), BufferBuildError> + where + T: crate::TimedBufferable + Finalizable, + { + self.build_with_clock(data_dir, id, span, std::sync::Arc::new(crate::SystemClock)) + .await + } + + /// Like [`build`](Self::build) but accepts an explicit clock, primarily for tests that want + /// deterministic queue-delay measurements. + /// + /// # Errors + /// + /// Same as [`build`](Self::build). + #[allow(clippy::needless_pass_by_value)] + pub async fn build_with_clock( + &self, + data_dir: Option, + id: impl Into, + span: Span, + clock: std::sync::Arc, + ) -> Result<(BufferSender, BufferReceiver), BufferBuildError> where T: crate::TimedBufferable + Finalizable, { @@ -380,7 +401,7 @@ impl BufferConfig { } builder - .build(component_key, span) + .build_with_clock(component_key, span, clock) .await .context(FailedToBuildTopologySnafu) } diff --git a/lib/vector-buffers/src/topology/builder.rs b/lib/vector-buffers/src/topology/builder.rs index ff4fd3c57d..828de43784 100644 --- a/lib/vector-buffers/src/topology/builder.rs +++ b/lib/vector-buffers/src/topology/builder.rs @@ -122,9 +122,24 @@ where self, id: impl Into, span: Span, + ) -> Result<(BufferSender, BufferReceiver), TopologyError> { + self.build_with_clock(id, span, Arc::new(SystemClock)) + .await + } + + /// Like [`build`](Self::build) but accepts an explicit clock, primarily for tests that want + /// deterministic queue-delay measurements. + /// + /// # Errors + /// + /// Same as [`build`](Self::build). + pub async fn build_with_clock( + self, + id: impl Into, + span: Span, + clock: Arc, ) -> Result<(BufferSender, BufferReceiver), TopologyError> { let component_id = id.into().into_id(); - let clock: Arc = Arc::new(SystemClock); // We pop stages off in reverse order to build from the inside out. let mut buffer_usage = BufferUsage::from_span(span.clone()); diff --git a/lib/vector-buffers/src/topology/channel/sender.rs b/lib/vector-buffers/src/topology/channel/sender.rs index bf3f2c4441..a850cc06fe 100644 --- a/lib/vector-buffers/src/topology/channel/sender.rs +++ b/lib/vector-buffers/src/topology/channel/sender.rs @@ -110,6 +110,15 @@ where } } + pub(crate) async fn close(&mut self) { + match self { + Self::InMemory(_) => {} + Self::DiskV2(writer) => { + writer.lock().await.close(); + } + } + } + pub fn capacity(&self) -> Option { match self { Self::InMemory(tx) => Some(tx.available_capacity()), @@ -300,4 +309,12 @@ where Ok(()) } + + #[async_recursion] + pub async fn close(&mut self) { + self.base.close().await; + if let Some(overflow) = self.overflow.as_mut() { + overflow.close().await; + } + } } diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index e5c1f9e583..3f4208c00a 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -93,10 +93,12 @@ base64 = "0.22.1" chrono-tz.workspace = true criterion = { version = "0.5.1", features = ["html_reports"] } env-test-util = "1.0.1" +metrics-util = { workspace = true, features = ["debugging"] } quickcheck = "1" quickcheck_macros = "1" proptest = "1.5" similar-asserts = "1.6.0" +tempfile = "3.15.0" tokio-test = "0.4.4" toml.workspace = true ndarray = "0.16.1" diff --git a/lib/vector-core/tests/buffer_disk_v2_compat.rs b/lib/vector-core/tests/buffer_disk_v2_compat.rs new file mode 100644 index 0000000000..2d21dfc9b0 --- /dev/null +++ b/lib/vector-core/tests/buffer_disk_v2_compat.rs @@ -0,0 +1,205 @@ +//! Integration tests for disk_v2 buffer backwards compatibility with the `enq_tm` proto field. +//! +//! T2: a buffer written with the new (`Timed`-wrapping) topology round-trips correctly. +//! T3: legacy records written before `enq_tm` existed still decode through the new buffer path. +//! Uses the checked-in fixture at `tests/data/fixtures/legacy_disk_buffer_v2/`. +//! T4a: a buffer containing both legacy records and freshly-written ones drains in FIFO order +//! with all payloads intact. +//! +//! Histogram emission semantics are verified separately — see `buffer_histogram_demo.rs`. + +#[cfg(test)] +mod tests { + use std::{ + num::NonZeroU64, + path::{Path, PathBuf}, + }; + + use tempfile::TempDir; + use tracing::Span; + use vector_buffers::{ + topology::channel::{BufferReceiver, BufferSender}, + BufferConfig, BufferType, WhenFull, + }; + use vector_common::finalization::{EventStatus, Finalizable}; + use vector_core::event::{EventArray, LogEvent, Metric, MetricKind, MetricValue}; + + const FIXTURE_SUBDIR: &str = "tests/data/fixtures/legacy_disk_buffer_v2"; + const BUFFER_ID: &str = "legacy"; + const MAX_SIZE: u64 = 268_435_488; + + fn fixture_root() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(FIXTURE_SUBDIR) + } + + fn copy_dir_recursive(src: &Path, dst: &Path) -> std::io::Result<()> { + std::fs::create_dir_all(dst)?; + for entry in std::fs::read_dir(src)? { + let entry = entry?; + let new_path = dst.join(entry.file_name()); + if entry.file_type()?.is_dir() { + copy_dir_recursive(&entry.path(), &new_path)?; + } else { + std::fs::copy(entry.path(), &new_path)?; + } + } + Ok(()) + } + + fn disk_config() -> BufferConfig { + BufferConfig::Single(BufferType::DiskV2 { + when_full: WhenFull::Block, + max_size: NonZeroU64::new(MAX_SIZE).unwrap(), + }) + } + + async fn open_buffer( + data_dir: PathBuf, + ) -> (BufferSender, BufferReceiver) { + disk_config() + .build::(Some(data_dir), BUFFER_ID, Span::none()) + .await + .expect("build buffer") + } + + fn sample_logs(message: &str, level: &str) -> EventArray { + let mut log = LogEvent::default(); + log.insert("message", message); + log.insert("level", level); + EventArray::Logs(vec![log]) + } + + fn sample_metric(name: &str, value: f64) -> EventArray { + EventArray::Metrics(vec![Metric::new( + name, + MetricKind::Incremental, + MetricValue::Counter { value }, + )]) + } + + fn log_message(ea: &EventArray) -> Option { + match ea { + EventArray::Logs(logs) => logs.first().and_then(|l| { + l.get("message") + .and_then(|v| v.as_bytes()) + .map(|b| String::from_utf8_lossy(b).into_owned()) + }), + _ => None, + } + } + + fn metric_name(ea: &EventArray) -> Option { + match ea { + EventArray::Metrics(m) => m.first().map(|m| m.name().to_string()), + _ => None, + } + } + + #[tokio::test] + async fn t2_disk_buffer_new_proto_roundtrip() { + let tmp = TempDir::new().expect("tempdir"); + + let (mut sender, mut receiver) = open_buffer(tmp.path().to_path_buf()).await; + sender + .send(sample_logs("alpha", "info"), None) + .await + .expect("send"); + sender + .send(sample_logs("beta", "warn"), None) + .await + .expect("send"); + sender + .send(sample_metric("c", 7.0), None) + .await + .expect("send"); + sender.flush().await.expect("flush"); + sender.close().await; + drop(sender); + + let mut drained = Vec::new(); + while let Some(mut ea) = receiver.next().await { + ea.take_finalizers().update_status(EventStatus::Delivered); + tokio::task::yield_now().await; + drained.push(ea); + } + + assert_eq!(drained.len(), 3); + assert_eq!(log_message(&drained[0]).as_deref(), Some("alpha")); + assert_eq!(log_message(&drained[1]).as_deref(), Some("beta")); + assert_eq!(metric_name(&drained[2]).as_deref(), Some("c")); + } + + #[tokio::test] + async fn t3_legacy_fixture_decodes_through_new_buffer() { + let tmp = TempDir::new().expect("tempdir"); + copy_dir_recursive(&fixture_root(), tmp.path()).expect("copy fixture"); + + let (mut sender, mut receiver) = open_buffer(tmp.path().to_path_buf()).await; + sender.close().await; + drop(sender); + + let mut drained = Vec::new(); + while let Some(mut ea) = receiver.next().await { + ea.take_finalizers().update_status(EventStatus::Delivered); + tokio::task::yield_now().await; + drained.push(ea); + } + + assert_eq!(drained.len(), 2, "fixture has 2 records"); + + let logs = match &drained[0] { + EventArray::Logs(logs) => logs, + other => panic!("expected Logs, got {other:?}"), + }; + assert_eq!(logs.len(), 2); + assert_eq!( + logs[0] + .get("message") + .and_then(|v| v.as_bytes()) + .map(|b| String::from_utf8_lossy(b).into_owned()), + Some("hello".to_string()) + ); + assert_eq!( + logs[1] + .get("message") + .and_then(|v| v.as_bytes()) + .map(|b| String::from_utf8_lossy(b).into_owned()), + Some("world".to_string()) + ); + + assert_eq!(metric_name(&drained[1]).as_deref(), Some("test_counter")); + } + + #[tokio::test] + async fn t4a_mixed_legacy_and_new_records_drain_in_order() { + let tmp = TempDir::new().expect("tempdir"); + copy_dir_recursive(&fixture_root(), tmp.path()).expect("copy fixture"); + + let (mut sender, mut receiver) = open_buffer(tmp.path().to_path_buf()).await; + + sender + .send(sample_logs("new1", "info"), None) + .await + .expect("send"); + sender + .send(sample_metric("new_counter", 99.0), None) + .await + .expect("send"); + sender.flush().await.expect("flush"); + sender.close().await; + drop(sender); + + let mut drained = Vec::new(); + while let Some(mut ea) = receiver.next().await { + ea.take_finalizers().update_status(EventStatus::Delivered); + tokio::task::yield_now().await; + drained.push(ea); + } + + assert_eq!(drained.len(), 4, "2 legacy + 2 new"); + assert_eq!(log_message(&drained[0]).as_deref(), Some("hello")); + assert_eq!(metric_name(&drained[1]).as_deref(), Some("test_counter")); + assert_eq!(log_message(&drained[2]).as_deref(), Some("new1")); + assert_eq!(metric_name(&drained[3]).as_deref(), Some("new_counter")); + } +} diff --git a/lib/vector-core/tests/buffer_histogram_demo.rs b/lib/vector-core/tests/buffer_histogram_demo.rs new file mode 100644 index 0000000000..cff618f99b --- /dev/null +++ b/lib/vector-core/tests/buffer_histogram_demo.rs @@ -0,0 +1,167 @@ +//! Demonstrates that `topology_queue_delay_seconds` is updated with the actual queue residency, +//! for both in-memory and disk-backed buffers, using a deterministic clock driven by tokio's +//! paused-time machinery. This is the single place where we verify the histogram values; other +//! buffer-level tests stay focused on payload behavior. + +#[cfg(test)] +mod tests { + use std::{ + num::{NonZeroU64, NonZeroUsize}, + sync::Arc, + time::{Duration, SystemTime}, + }; + + use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter}; + use tempfile::TempDir; + use tokio::time::Instant as TokioInstant; + use tracing::Span; + use vector_buffers::{BufferConfig, BufferType, Clock, WhenFull}; + use vector_common::finalization::{EventStatus, Finalizable}; + use vector_core::event::{EventArray, LogEvent}; + + const QUEUE_DELAY_METRIC: &str = "topology_queue_delay_seconds"; + const MAX_SIZE: u64 = 268_435_488; + const SLEEP: Duration = Duration::from_millis(100); + + /// `Clock` whose `now()` tracks tokio's (potentially paused) time, offset from an anchor. + /// With paused tokio time, `tokio::time::sleep(d).await` advances both tokio time and `now()` + /// by exactly `d`, without any real wall-clock wait. + struct TokioBackedClock { + anchor_sys: SystemTime, + anchor_tokio: TokioInstant, + } + + impl TokioBackedClock { + fn new() -> Self { + Self { + anchor_sys: SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000), + anchor_tokio: TokioInstant::now(), + } + } + } + + impl Clock for TokioBackedClock { + fn now(&self) -> SystemTime { + self.anchor_sys + TokioInstant::now().duration_since(self.anchor_tokio) + } + } + + fn sample_logs(message: &str) -> EventArray { + let mut log = LogEvent::default(); + log.insert("message", message); + EventArray::Logs(vec![log]) + } + + fn queue_delay_samples(snapshotter: &Snapshotter) -> Vec { + snapshotter + .snapshot() + .into_vec() + .into_iter() + .filter_map(|(key, _unit, _desc, value)| { + if key.key().name() != QUEUE_DELAY_METRIC { + return None; + } + match value { + DebugValue::Histogram(samples) => { + Some(samples.into_iter().map(f64::from).collect::>()) + } + _ => None, + } + }) + .flatten() + .collect() + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn histogram_emits_for_in_memory_buffer() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + let _guard = metrics::set_default_local_recorder(&recorder); + + let clock: Arc = Arc::new(TokioBackedClock::new()); + let config = BufferConfig::Single(BufferType::Memory { + when_full: WhenFull::Block, + max_events: NonZeroUsize::new(16).unwrap(), + }); + let (mut sender, mut receiver) = config + .build_with_clock::(None, "histogram_demo_mem", Span::none(), clock) + .await + .expect("build"); + + sender.send(sample_logs("a"), None).await.expect("send"); + sender.send(sample_logs("b"), None).await.expect("send"); + sender.send(sample_logs("c"), None).await.expect("send"); + + tokio::time::sleep(SLEEP).await; + + drop(sender); + let mut drained = Vec::new(); + while let Some(mut ea) = receiver.next().await { + ea.take_finalizers().update_status(EventStatus::Delivered); + tokio::task::yield_now().await; + drained.push(ea); + } + assert_eq!(drained.len(), 3); + + let samples = queue_delay_samples(&snapshotter); + assert_eq!(samples.len(), 3, "one sample per drained record"); + let expected = SLEEP.as_secs_f64(); + for (i, s) in samples.iter().enumerate() { + assert!( + (*s - expected).abs() < 1e-6, + "sample[{i}] = {s} should equal {expected}" + ); + } + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn histogram_emits_for_disk_buffer() { + let tmp = TempDir::new().expect("tempdir"); + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + let _guard = metrics::set_default_local_recorder(&recorder); + + let clock: Arc = Arc::new(TokioBackedClock::new()); + let config = BufferConfig::Single(BufferType::DiskV2 { + when_full: WhenFull::Block, + max_size: NonZeroU64::new(MAX_SIZE).unwrap(), + }); + let (mut sender, mut receiver) = config + .build_with_clock::( + Some(tmp.path().to_path_buf()), + "histogram_demo_disk", + Span::none(), + clock, + ) + .await + .expect("build"); + + sender.send(sample_logs("a"), None).await.expect("send"); + sender.send(sample_logs("b"), None).await.expect("send"); + sender.send(sample_logs("c"), None).await.expect("send"); + sender.flush().await.expect("flush"); + + tokio::time::sleep(SLEEP).await; + + sender.close().await; + drop(sender); + let mut drained = Vec::new(); + while let Some(mut ea) = receiver.next().await { + ea.take_finalizers().update_status(EventStatus::Delivered); + tokio::task::yield_now().await; + drained.push(ea); + } + assert_eq!(drained.len(), 3); + + let samples = queue_delay_samples(&snapshotter); + assert_eq!(samples.len(), 3, "one sample per drained record"); + let expected = SLEEP.as_secs_f64(); + for (i, s) in samples.iter().enumerate() { + assert!( + (*s - expected).abs() < 1e-6, + "sample[{i}] = {s} should equal {expected}" + ); + } + } +} From bf9af5d36e3a1aa16dca13e620225d14c76584d9 Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Fri, 12 Jun 2026 22:51:27 +0530 Subject: [PATCH 5/9] Feat[OBE-10264] - Add top-level topology-build test for queue-delay reporting --- .../tests/buffer_histogram_demo.rs | 101 ++++++++++++++++++ src/topology/test/mod.rs | 6 ++ src/topology/test/queue_delay.rs | 78 ++++++++++++++ 3 files changed, 185 insertions(+) create mode 100644 src/topology/test/queue_delay.rs diff --git a/lib/vector-core/tests/buffer_histogram_demo.rs b/lib/vector-core/tests/buffer_histogram_demo.rs index cff618f99b..5f6264763e 100644 --- a/lib/vector-core/tests/buffer_histogram_demo.rs +++ b/lib/vector-core/tests/buffer_histogram_demo.rs @@ -6,6 +6,7 @@ #[cfg(test)] mod tests { use std::{ + collections::HashMap, num::{NonZeroU64, NonZeroUsize}, sync::Arc, time::{Duration, SystemTime}, @@ -72,6 +73,28 @@ mod tests { .collect() } + fn queue_delay_samples_by_stage(snapshotter: &Snapshotter) -> HashMap> { + let mut by_stage: HashMap> = HashMap::new(); + for (key, _unit, _desc, value) in snapshotter.snapshot().into_vec() { + if key.key().name() != QUEUE_DELAY_METRIC { + continue; + } + let stage = key + .key() + .labels() + .find(|l| l.key() == "stage") + .map(|l| l.value().to_string()) + .unwrap_or_default(); + if let DebugValue::Histogram(samples) = value { + by_stage + .entry(stage) + .or_default() + .extend(samples.into_iter().map(f64::from)); + } + } + by_stage + } + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn histogram_emits_for_in_memory_buffer() { let recorder = DebuggingRecorder::new(); @@ -164,4 +187,82 @@ mod tests { ); } } + + // T7: a two-stage (memory base + disk overflow) buffer should emit histogram samples for + // both stages, and each sample should reflect total queue residency since the original send + // (the overflow path must preserve `enq_tm`, not re-stamp). + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn histogram_emits_per_stage_for_overflow_buffer() { + let tmp = TempDir::new().expect("tempdir"); + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + let _guard = metrics::set_default_local_recorder(&recorder); + + let clock: Arc = Arc::new(TokioBackedClock::new()); + let config = BufferConfig::Chained(vec![ + BufferType::Memory { + when_full: WhenFull::Overflow, + max_events: NonZeroUsize::new(1).unwrap(), + }, + BufferType::DiskV2 { + when_full: WhenFull::Block, + max_size: NonZeroU64::new(MAX_SIZE).unwrap(), + }, + ]); + let (mut sender, mut receiver) = config + .build_with_clock::( + Some(tmp.path().to_path_buf()), + "histogram_demo_overflow", + Span::none(), + clock, + ) + .await + .expect("build"); + + sender.send(sample_logs("a"), None).await.expect("send"); + sender.send(sample_logs("b"), None).await.expect("send"); + sender.send(sample_logs("c"), None).await.expect("send"); + sender.flush().await.expect("flush"); + + tokio::time::sleep(SLEEP).await; + + sender.close().await; + drop(sender); + + let mut drained = Vec::new(); + while let Some(mut ea) = receiver.next().await { + ea.take_finalizers().update_status(EventStatus::Delivered); + tokio::task::yield_now().await; + drained.push(ea); + } + assert_eq!(drained.len(), 3); + + let by_stage = queue_delay_samples_by_stage(&snapshotter); + let stage_0 = by_stage.get("0").cloned().unwrap_or_default(); + let stage_1 = by_stage.get("1").cloned().unwrap_or_default(); + + assert!( + !stage_0.is_empty(), + "expected at least one sample for stage=0 (memory base)" + ); + assert!( + !stage_1.is_empty(), + "expected at least one sample for stage=1 (disk overflow)" + ); + assert_eq!( + stage_0.len() + stage_1.len(), + 3, + "total samples across stages should match drained record count" + ); + + let expected = SLEEP.as_secs_f64(); + for s in stage_0.iter().chain(stage_1.iter()) { + assert!( + (*s - expected).abs() < 1e-6, + "every sample (memory or overflow) should reflect total residency: \ + {s} != {expected}" + ); + } + } } diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index 9a93be332a..28727c60dc 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -32,6 +32,12 @@ use vector_lib::config::OutputId; mod backpressure; mod compliance; +#[cfg(all( + feature = "sources-demo_logs", + feature = "transforms-remap", + feature = "sinks-blackhole" +))] +mod queue_delay; #[cfg(all(feature = "sinks-socket", feature = "sources-socket"))] mod crash; mod doesnt_reload; diff --git a/src/topology/test/queue_delay.rs b/src/topology/test/queue_delay.rs new file mode 100644 index 0000000000..7dfc2a9ff8 --- /dev/null +++ b/src/topology/test/queue_delay.rs @@ -0,0 +1,78 @@ +//! End-to-end test that `topology_queue_delay_seconds` is emitted with the consumer's +//! `component_id` for every buffered arrow, going through the TOML config -> builder -> +//! running-topology path (`demo_logs` -> `remap` -> `remap` -> `blackhole`). + +use std::{collections::HashMap, time::Duration}; + +use tokio::time::sleep; +use vector_lib::metrics::Controller; + +use crate::{ + config::ConfigBuilder, + event::MetricValue, + test_util::{start_topology, trace_init}, +}; + +const QUEUE_DELAY_METRIC: &str = "topology_queue_delay_seconds"; + +const CONFIG: &str = r#" +[sources.in] +type = "demo_logs" +format = "shuffle" +lines = ["queue-delay-test-line"] +count = 5 +interval = 0.0 + +[transforms.t1] +type = "remap" +inputs = ["in"] +source = "." + +[transforms.t2] +type = "remap" +inputs = ["t1"] +source = "." + +[sinks.out] +type = "blackhole" +inputs = ["t2"] +"#; + +#[tokio::test(flavor = "current_thread")] +async fn histograms_emitted_for_every_buffered_arrow() { + trace_init(); + + let config = ConfigBuilder::from_toml(CONFIG) + .build() + .expect("build config"); + let (topology, _shutdown) = start_topology(config, false).await; + + // demo_logs emits 5 events with interval=0 then closes. A short real-time sleep is enough + // for the events to drain through three buffers on the current-thread runtime. + sleep(Duration::from_millis(200)).await; + + let mut sample_counts: HashMap = HashMap::new(); + for metric in Controller::get().unwrap().capture_metrics() { + if metric.name() != QUEUE_DELAY_METRIC { + continue; + } + let Some(tags) = metric.tags() else { continue }; + let Some(cid) = tags.get("component_id") else { + continue; + }; + if let MetricValue::AggregatedHistogram { count, .. } = metric.value() { + *sample_counts.entry(cid.to_string()).or_default() += count; + } + } + + for cid in &["t1", "t2", "out"] { + let count = sample_counts.get(*cid).copied().unwrap_or(0); + assert!( + count > 0, + "expected at least one histogram sample for component_id={cid}, got {count}. \ + Captured counts: {sample_counts:?}" + ); + } + + topology.stop().await; +} From 6c923ff5f74565a0a2165d963094e6efca372a18 Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Fri, 12 Jun 2026 23:06:33 +0530 Subject: [PATCH 6/9] Chore[OBE-10264] - Make `buffer_send_duration_seconds` mandatory while creating sender --- lib/vector-buffers/src/topology/builder.rs | 26 ++++++++++++----- .../src/topology/channel/sender.rs | 29 +++++++++---------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/lib/vector-buffers/src/topology/builder.rs b/lib/vector-buffers/src/topology/builder.rs index 828de43784..d1afa82720 100644 --- a/lib/vector-buffers/src/topology/builder.rs +++ b/lib/vector-buffers/src/topology/builder.rs @@ -8,7 +8,7 @@ use vector_common::{config::ComponentKey, internal_event::register}; use super::channel::{ReceiverAdapter, SenderAdapter}; use crate::{ buffer_usage_data::{BufferUsage, BufferUsageHandle}, - internal_events::BufferQueueDelay, + internal_events::{BufferQueueDelay, BufferSendDuration}, topology::channel::{BufferReceiver, BufferSender}, variants::MemoryBuffer, Clock, SystemClock, TimedBufferable, WhenFull, @@ -179,14 +179,23 @@ where component_id: component_id.clone(), stage: stage_idx, }); + let send_duration = { + let _enter = span.enter(); + register(BufferSendDuration { stage: stage_idx }) + }; let (mut sender, mut receiver) = match current_stage.take() { None => ( - BufferSender::new(sender, stage.when_full, Arc::clone(&clock)), + BufferSender::new(sender, stage.when_full, Arc::clone(&clock), send_duration), BufferReceiver::new(receiver, queue_delay, Arc::clone(&clock)), ), Some((current_sender, current_receiver)) => ( - BufferSender::with_overflow(sender, current_sender, Arc::clone(&clock)), + BufferSender::with_overflow( + sender, + current_sender, + Arc::clone(&clock), + send_duration, + ), BufferReceiver::with_overflow( receiver, current_receiver, @@ -196,7 +205,6 @@ where ), }; - sender.with_send_duration_instrumentation(stage_idx, &span); if !provides_instrumentation { sender.with_usage_instrumentation(usage_handle.clone()); receiver.with_usage_instrumentation(usage_handle); @@ -251,8 +259,11 @@ where WhenFull::Overflow => WhenFull::Block, m => m, }; - let mut sender = BufferSender::new(sender, mode, Arc::clone(&clock)); - sender.with_send_duration_instrumentation(0, receiver_span); + let send_duration = { + let _enter = receiver_span.enter(); + register(BufferSendDuration { stage: 0 }) + }; + let sender = BufferSender::new(sender, mode, Arc::clone(&clock), send_duration); let queue_delay = register(BufferQueueDelay { component_id, stage: 0, @@ -296,7 +307,8 @@ where component_id: String::from(""), stage: 0, }); - let mut sender = BufferSender::new(sender, mode, Arc::clone(&clock)); + let send_duration = register(BufferSendDuration { stage: 0 }); + let mut sender = BufferSender::new(sender, mode, Arc::clone(&clock), send_duration); let mut receiver = BufferReceiver::new(receiver, queue_delay, clock); sender.with_usage_instrumentation(usage_handle.clone()); diff --git a/lib/vector-buffers/src/topology/channel/sender.rs b/lib/vector-buffers/src/topology/channel/sender.rs index a850cc06fe..3e0f5a91b5 100644 --- a/lib/vector-buffers/src/topology/channel/sender.rs +++ b/lib/vector-buffers/src/topology/channel/sender.rs @@ -3,10 +3,9 @@ use std::{sync::Arc, time::Instant}; use async_recursion::async_recursion; use derivative::Derivative; use tokio::sync::Mutex; -use tracing::Span; use vector_common::{ byte_size_of::ByteSizeOf, - internal_event::{register, InternalEventHandle, Registered}, + internal_event::{InternalEventHandle, Registered}, }; use super::limited_queue::LimitedSender; @@ -160,7 +159,7 @@ where when_full: WhenFull, instrumentation: Option, #[derivative(Debug = "ignore")] - send_duration: Option>, + send_duration: Registered, #[derivative(Debug = "ignore")] clock: Arc, } @@ -170,13 +169,18 @@ where T: TimedBufferable, { /// Creates a new [`BufferSender`] wrapping the given channel sender. - pub fn new(base: SenderAdapter, when_full: WhenFull, clock: Arc) -> Self { + pub fn new( + base: SenderAdapter, + when_full: WhenFull, + clock: Arc, + send_duration: Registered, + ) -> Self { Self { base, overflow: None, when_full, instrumentation: None, - send_duration: None, + send_duration, clock, } } @@ -186,13 +190,14 @@ where base: SenderAdapter, overflow: BufferSender, clock: Arc, + send_duration: Registered, ) -> Self { Self { base, overflow: Some(Box::new(overflow)), when_full: WhenFull::Overflow, instrumentation: None, - send_duration: None, + send_duration, clock, } } @@ -211,12 +216,6 @@ where pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) { self.instrumentation = Some(handle); } - - /// Configures this sender to instrument the send duration. - pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) { - let _enter = span.enter(); - self.send_duration = Some(register(BufferSendDuration { stage })); - } } impl BufferSender @@ -271,10 +270,8 @@ where }; if sent_to_base || was_dropped { - if let (Some(send_duration), Some(send_reference)) = - (self.send_duration.as_ref(), send_reference) - { - send_duration.emit(send_reference.elapsed()); + if let Some(send_reference) = send_reference { + self.send_duration.emit(send_reference.elapsed()); } } From f8e72aea9d69dbf556d3dbf6450689dbaf8a712e Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Sat, 13 Jun 2026 00:01:46 +0530 Subject: [PATCH 7/9] Feat[OBE-10264] - Improve tags on queue-residency metric + fix the build --- lib/vector-buffers/examples/buffer_perf.rs | 4 +-- lib/vector-buffers/src/internal_events.rs | 6 +--- lib/vector-buffers/src/topology/builder.rs | 39 ++++++++++++---------- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/lib/vector-buffers/examples/buffer_perf.rs b/lib/vector-buffers/examples/buffer_perf.rs index c571c8b74e..8527259c8f 100644 --- a/lib/vector-buffers/examples/buffer_perf.rs +++ b/lib/vector-buffers/examples/buffer_perf.rs @@ -21,7 +21,7 @@ use vector_buffers::{ builder::TopologyBuilder, channel::{BufferReceiver, BufferSender}, }, - BufferType, Bufferable, EventCount, WhenFull, + BufferType, EventCount, TimedBufferable, WhenFull, }; use vector_common::byte_size_of::ByteSizeOf; use vector_common::finalization::{ @@ -244,7 +244,7 @@ fn generate_record_cache(min: usize, max: usize) -> Vec { async fn generate_buffer(buffer_type: &str) -> (BufferSender, BufferReceiver) where - T: Bufferable + Clone + Finalizable, + T: TimedBufferable + Finalizable, { let data_dir = PathBuf::from("/tmp/vector"); let id = format!("{}-buffer-perf-testing", buffer_type); diff --git a/lib/vector-buffers/src/internal_events.rs b/lib/vector-buffers/src/internal_events.rs index 547f6bb19f..ca6570fd9a 100644 --- a/lib/vector-buffers/src/internal_events.rs +++ b/lib/vector-buffers/src/internal_events.rs @@ -140,13 +140,9 @@ registered_event! { registered_event! { BufferQueueDelay { - component_id: String, stage: usize, } => { - queue_delay: Histogram = histogram!( - "topology_queue_delay_seconds", - "component_id" => self.component_id.clone(), - "stage" => self.stage.to_string()), + queue_delay: Histogram = histogram!("topology_queue_delay_seconds", "stage" => self.stage.to_string()), } fn emit(&self, duration: Duration) { diff --git a/lib/vector-buffers/src/topology/builder.rs b/lib/vector-buffers/src/topology/builder.rs index d1afa82720..3270bdc0e1 100644 --- a/lib/vector-buffers/src/topology/builder.rs +++ b/lib/vector-buffers/src/topology/builder.rs @@ -2,7 +2,7 @@ use std::{error::Error, num::NonZeroUsize, sync::Arc}; use async_trait::async_trait; use snafu::{ResultExt, Snafu}; -use tracing::Span; +use tracing::{error_span, Span}; use vector_common::{config::ComponentKey, internal_event::register}; use super::channel::{ReceiverAdapter, SenderAdapter}; @@ -175,13 +175,12 @@ where .await .context(FailedToBuildStageSnafu { stage_idx })?; - let queue_delay = register(BufferQueueDelay { - component_id: component_id.clone(), - stage: stage_idx, - }); - let send_duration = { + let (send_duration, queue_delay) = { let _enter = span.enter(); - register(BufferSendDuration { stage: stage_idx }) + ( + register(BufferSendDuration { stage: stage_idx }), + register(BufferQueueDelay { stage: stage_idx }), + ) }; let (mut sender, mut receiver) = match current_stage.take() { @@ -245,7 +244,7 @@ where id: impl Into, receiver_span: &Span, ) -> (BufferSender, BufferReceiver) { - let component_id = id.into().into_id(); + let component_key = id.into(); let clock: Arc = Arc::new(SystemClock); let usage_handle = BufferUsageHandle::noop(); @@ -259,15 +258,22 @@ where WhenFull::Overflow => WhenFull::Block, m => m, }; - let send_duration = { + let buffer_span = { let _enter = receiver_span.enter(); - register(BufferSendDuration { stage: 0 }) + error_span!( + "standalone_memory", + component_id = %component_key.id(), + buffer_type = "memory", + ) + }; + let (send_duration, queue_delay) = { + let _enter = buffer_span.enter(); + ( + register(BufferSendDuration { stage: 0 }), + register(BufferQueueDelay { stage: 0 }), + ) }; let sender = BufferSender::new(sender, mode, Arc::clone(&clock), send_duration); - let queue_delay = register(BufferQueueDelay { - component_id, - stage: 0, - }); let receiver = BufferReceiver::new(receiver, queue_delay, clock); (sender, receiver) @@ -303,11 +309,8 @@ where WhenFull::Overflow => WhenFull::Block, m => m, }; - let queue_delay = register(BufferQueueDelay { - component_id: String::from(""), - stage: 0, - }); let send_duration = register(BufferSendDuration { stage: 0 }); + let queue_delay = register(BufferQueueDelay { stage: 0 }); let mut sender = BufferSender::new(sender, mode, Arc::clone(&clock), send_duration); let mut receiver = BufferReceiver::new(receiver, queue_delay, clock); From 22d2e3c272b1f87a2807bd5fe0eb51f091161d99 Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Wed, 17 Jun 2026 18:42:06 +0530 Subject: [PATCH 8/9] Feat[OBE-10264] - Implement Histogram->Summary transform to avoid cardinality explosion --- Cargo.toml | 2 + lib/vector-core/src/metrics/mod.rs | 1 + lib/vector-core/src/metrics/storage.rs | 6 +- src/transforms/hist_summ.rs | 1078 ++++++++++++++++++++++++ src/transforms/mod.rs | 2 + 5 files changed, 1086 insertions(+), 3 deletions(-) create mode 100644 src/transforms/hist_summ.rs diff --git a/Cargo.toml b/Cargo.toml index 80281e7100..0508e519c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -731,6 +731,7 @@ transforms-logs = [ transforms-metrics = [ "transforms-aggregate", "transforms-filter", + "transforms-hist_summ", "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", @@ -741,6 +742,7 @@ transforms-metrics = [ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] +transforms-hist_summ = [] transforms-dedupe = ["transforms-impl-dedupe"] transforms-filter = [] transforms-log_to_metric = [] diff --git a/lib/vector-core/src/metrics/mod.rs b/lib/vector-core/src/metrics/mod.rs index cb853f17f1..7ca1b39c68 100644 --- a/lib/vector-core/src/metrics/mod.rs +++ b/lib/vector-core/src/metrics/mod.rs @@ -13,6 +13,7 @@ use metrics_util::layers::Layer; use snafu::Snafu; pub use self::ddsketch::{AgentDDSketch, BinMap, Config}; +pub use self::storage::Histogram; use self::{label_filter::VectorLabelFilter, recorder::Registry, recorder::VectorRecorder}; use crate::event::{Metric, MetricValue}; diff --git a/lib/vector-core/src/metrics/storage.rs b/lib/vector-core/src/metrics/storage.rs index d8955c11d3..7b69895c11 100644 --- a/lib/vector-core/src/metrics/storage.rs +++ b/lib/vector-core/src/metrics/storage.rs @@ -73,7 +73,7 @@ impl GaugeFn for AtomicF64 { } #[derive(Debug)] -pub(super) struct Histogram { +pub struct Histogram { buckets: Box<[(f64, AtomicU32); 20]>, count: AtomicU64, sum: AtomicF64, @@ -84,7 +84,7 @@ impl Histogram { const MIN_BUCKET_EXP: f64 = -6.0; const BUCKETS: usize = 20; - pub(crate) fn new() -> Self { + pub fn new() -> Self { // Box to avoid having this large array inline to the structure, blowing // out cache coherence. // @@ -153,7 +153,7 @@ impl Histogram { .collect() } - pub(super) fn make_metric(&self) -> MetricValue { + pub fn make_metric(&self) -> MetricValue { MetricValue::AggregatedHistogram { buckets: self.buckets(), count: self.count(), diff --git a/src/transforms/hist_summ.rs b/src/transforms/hist_summ.rs new file mode 100644 index 0000000000..d8df38a9e1 --- /dev/null +++ b/src/transforms/hist_summ.rs @@ -0,0 +1,1078 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use regex::Regex; +use snafu::Snafu; +use vector_lib::config::LogNamespace; +use vector_lib::configurable::configurable_component; +use vector_lib::event::EventMetadata; +use vector_lib::event::metric::{Bucket, MetricData, MetricSeries, MetricValue, Quantile}; + +use crate::{ + config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput}, + event::Event, + schema, + transforms::{FunctionTransform, OutputBuffer, Transform}, +}; + +/// Matching criteria for histograms. +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(untagged, deny_unknown_fields)] +pub enum HistPredicate { + /// Match histograms whose name appears in `names`. + Exact { + /// Exact metric names to match. The same name must not appear across two rules. + names: Vec, + }, + + /// Match histograms whose name matches the regular expression `pattern`. + Pattern { + /// A regular expression evaluated against the metric name. + pattern: String, + }, +} + +/// Behavior when a matched histogram has `count == 0`. +#[configurable_component] +#[derive(Clone, Debug, Default, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ZeroBehavior { + /// Drop the histogram; emit no summary. + Drop, + + /// Emit a summary with `count = 0`, `sum = 0.0`, and every quantile's value set to `0.0`. + #[serde(alias = "emit_zero")] + Zero, + + /// Emit a summary with `count = 0`, `sum = 0.0`, and every quantile's value set to `NaN`. + #[default] + #[serde(alias = "emit_nan")] + Nan, +} + +/// A single summarization rule. +#[configurable_component] +#[derive(Clone, Debug, PartialEq)] +pub struct Policy { + /// Predicate selecting which histograms this rule applies to. + /// + /// Exact predicates take precedence over pattern predicates. In case of overlap among pattern + /// predicates, the first matching rule (in declaration order) wins. + #[configurable(metadata(docs::human_name = "Predicate"))] + #[serde(alias = "match")] + pub target: HistPredicate, + + /// Quantiles to publish on the output summary. Each value must lie in `[0.0, 1.0]`. + #[serde(default = "default_quantiles")] + #[configurable(metadata(docs::human_name = "Quantiles to publish"))] + #[serde(alias = "qtiles")] + #[serde(alias = "qs")] + pub quantiles: Vec, + + /// Behavior for histograms with `count == 0`. + #[serde(default)] + #[configurable(metadata(docs::human_name = "Behavior for zero-sample histograms"))] + #[serde(alias = "empty")] + pub zero: ZeroBehavior, +} + +fn default_quantiles() -> Vec { + vec![0.95, 0.99] +} + +const fn default_admit_unmatched() -> bool { + true +} + +/// Configuration for the `hist_summ` transform. +#[configurable_component(transform( + "hist_summ", + "Convert matching aggregated histogram metrics into aggregated summary metrics." +))] +#[derive(Clone, Debug, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct HistSummConfig { + /// Summarization rules evaluated in declaration order. + #[serde(default)] + pub policies: Vec, + + /// Pass histograms that do not match any rule through unchanged. When `false`, unmatched + /// histograms are dropped. + #[serde(default = "default_admit_unmatched")] + #[configurable(metadata(docs::human_name = "Pass unmatched histograms"))] + #[serde(alias = "admit_unknown")] + #[serde(alias = "allow_unknown")] + pub admit_unmatched: bool, +} + +impl Default for HistSummConfig { + fn default() -> Self { + Self { + policies: Vec::new(), + admit_unmatched: default_admit_unmatched(), + } + } +} + +impl_generate_config_from_default!(HistSummConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "hist_summ")] +impl TransformConfig for HistSummConfig { + async fn build(&self, _context: &TransformContext) -> crate::Result { + HistSumm::new(self).map(Transform::function) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + _: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + vec![TransformOutput::new(DataType::Metric, HashMap::new())] + } +} + +#[derive(Debug, Snafu)] +pub enum BuildError { + #[snafu(display("policy {policy_idx}: exact predicate has no names"))] + EmptyExactList { policy_idx: usize }, + + #[snafu(display("policy {policy_idx}: exact predicate contains an empty name"))] + EmptyExactName { policy_idx: usize }, + + #[snafu(display("policy {policy_idx}: duplicate exact name `{name}`"))] + DuplicateExactName { policy_idx: usize, name: String }, + + #[snafu(display("policy {policy_idx}: quantile {value} is outside [0.0, 1.0]"))] + QuantileOutOfRange { policy_idx: usize, value: f64 }, + + #[snafu(display("policy {policy_idx}: empty quantile list"))] + EmptyQuantileList { policy_idx: usize }, + + #[snafu(display("policy {policy_idx}: invalid regex `{pattern}`: {source}"))] + InvalidPattern { + policy_idx: usize, + pattern: String, + source: regex::Error, + }, +} + +#[derive(Debug)] +struct RuleAction { + quantiles: Vec, + zero: ZeroBehavior, +} + +#[derive(Clone, Debug)] +pub struct HistSumm { + exact: HashMap>, + patterns: Vec<(Regex, Arc)>, + admit_unmatched: bool, +} + +impl HistSumm { + pub fn new(config: &HistSummConfig) -> crate::Result { + config + .policies + .iter() + .enumerate() + .try_fold( + (HashMap::new(), Vec::new()), + |(exact, patterns), (policy_idx, policy)| { + Self::absorb_policy(exact, patterns, policy_idx, policy) + }, + ) + .map(|(exact, patterns)| Self { + exact, + patterns, + admit_unmatched: config.admit_unmatched, + }) + .map_err(Into::into) + } + + fn absorb_policy( + exact: HashMap>, + mut patterns: Vec<(Regex, Arc)>, + policy_idx: usize, + policy: &Policy, + ) -> Result<(HashMap>, Vec<(Regex, Arc)>), BuildError> + { + Self::validate_quantiles(policy_idx, &policy.quantiles)?; + let action = Arc::new(RuleAction { + quantiles: policy.quantiles.clone(), + zero: policy.zero.clone(), + }); + + match &policy.target { + HistPredicate::Exact { names } => { + Self::validate_exact_names(policy_idx, names)?; + Self::merge_exact(exact, policy_idx, names, &action) + .map(|exact| (exact, patterns)) + } + HistPredicate::Pattern { pattern } => { + Self::compile_pattern(policy_idx, pattern).map(|regex| { + patterns.push((regex, action)); + (exact, patterns) + }) + } + } + } + + fn validate_quantiles(policy_idx: usize, quantiles: &[f64]) -> Result<(), BuildError> { + if quantiles.is_empty() { + return Err(BuildError::EmptyQuantileList { policy_idx }); + } + for &q in quantiles { + if !q.is_finite() || !(0.0..=1.0).contains(&q) { + return Err(BuildError::QuantileOutOfRange { policy_idx, value: q }); + } + } + Ok(()) + } + + fn validate_exact_names(policy_idx: usize, names: &[String]) -> Result<(), BuildError> { + if names.is_empty() { + return Err(BuildError::EmptyExactList { policy_idx }); + } + for n in names { + if n.is_empty() { + return Err(BuildError::EmptyExactName { policy_idx }); + } + } + Ok(()) + } + + fn merge_exact( + exact: HashMap>, + policy_idx: usize, + names: &[String], + action: &Arc, + ) -> Result>, BuildError> { + names.iter().try_fold(exact, |mut acc, name| { + if acc.contains_key(name) { + Err(BuildError::DuplicateExactName { + policy_idx, + name: name.clone(), + }) + } else { + acc.insert(name.clone(), Arc::clone(action)); + Ok(acc) + } + }) + } + + fn compile_pattern(policy_idx: usize, pattern: &str) -> Result { + Regex::new(pattern).map_err(|source| BuildError::InvalidPattern { + policy_idx, + pattern: pattern.to_string(), + source, + }) + } + + fn lookup(&self, name: &str) -> Option<&RuleAction> { + if let Some(action) = self.exact.get(name) { + Some(action.as_ref()) + } else { + self.patterns + .iter() + .find(|(re, _)| re.is_match(name)) + .map(|(_, action)| action.as_ref()) + } + } +} + +impl FunctionTransform for HistSumm { + fn transform(&mut self, output: &mut OutputBuffer, in_evt: Event) { + let out_evt = match in_evt { + Event::Metric(m) if matches!(&m.value(), MetricValue::AggregatedHistogram { .. }) => { + let (ser, mut data, meta) = m.into_parts(); + match (self.lookup(ser.name.name.as_str()), self.admit_unmatched) { + (Some(act), _) => { + let MetricValue::AggregatedHistogram { + buckets, + count, + sum, + } = std::mem::replace(&mut data.value, MetricValue::Counter { value: 0.0 }) else { + unreachable!("checked above"); + }; + let val = if count == 0 { + match act.zero { + ZeroBehavior::Drop => None, + ZeroBehavior::Zero => Some(0.0), + ZeroBehavior::Nan => Some(f64::NAN), + }.map(|v| zero_summary(&act.quantiles, v)) + } else { + Some(MetricValue::AggregatedSummary { + quantiles: estimate_quantiles(&buckets, count, &act.quantiles), + count, + sum, + }) + }; + val.map(|v| { + data.value = v; + mk_metric(ser, data, meta) + }) + }, + (None, true) => Some(mk_metric(ser, data, meta)), + _ => None + } + }, + other => Some(other), + }; + out_evt.map(|e| output.push(e)); + } +} + +fn mk_metric(ser: MetricSeries, data: MetricData, meta: EventMetadata) -> Event { + Event::Metric(vector_lib::event::Metric::from_parts(ser, data, meta)) +} + +fn zero_summary(qs: &[f64], value: f64) -> MetricValue { + MetricValue::AggregatedSummary { + quantiles: qs + .iter() + .map(|&quantile| Quantile { quantile, value }) + .collect(), + count: 0, + sum: 0.0, + } +} + +/// Estimate per-quantile values using Prometheus-style linear interpolation across +/// non-cumulative buckets. For target ranks falling in the `+Inf` bucket, returns +/// the previous finite `upper_limit`. +fn estimate_quantiles(buckets: &[Bucket], count: u64, qs: &[f64]) -> Vec { + let mut cum: Vec = Vec::with_capacity(buckets.len()); + let mut running: u64 = 0; + for b in buckets { + running = running.saturating_add(b.count); + cum.push(running); + } + + let count_f = count as f64; + qs + .iter() + .map(|&quantile| Quantile { + quantile, + value: interpolate(buckets, &cum, count_f, quantile), + }) + .collect() +} + +fn interpolate(buckets: &[Bucket], cum: &[u64], count: f64, q: f64) -> f64 { + let target = q * count; + match cum.iter().position(|&c| (c as f64) >= target) { + // Total cumulative count < target — only possible if `count` exceeds the sum of + // bucket counts (malformed input). Fall back to the largest finite upper limit. + None => last_finite_upper(buckets).unwrap_or(0.0), + Some(i) if buckets[i].upper_limit.is_infinite() => last_finite_upper(&buckets[..i]).unwrap_or(0.0), + Some(i) => { + let prev_cum = if i == 0 { 0.0 } else { cum[i - 1] as f64 }; + let prev_upper = if i == 0 { + 0.0 + } else { + buckets[i - 1].upper_limit + }; + let bucket_count = (cum[i] as f64) - prev_cum; + if bucket_count == 0.0 { + prev_upper + } else { + prev_upper + + (buckets[i].upper_limit - prev_upper) * (target - prev_cum) / bucket_count + } + } + } +} + +fn last_finite_upper(buckets: &[Bucket]) -> Option { + buckets + .iter() + .rev() + .map(|b| b.upper_limit) + .find(|u| u.is_finite()) +} + +#[cfg(test)] +mod tests { + use indoc::indoc; + use metrics::HistogramFn; + use tokio::sync::mpsc; + use tokio_stream::wrappers::ReceiverStream; + use vector_lib::event::{Metric, MetricKind}; + use vector_lib::metrics::Histogram as CoreHistogram; + + use super::*; + use crate::test_util::components::assert_transform_compliance; + use crate::test_util::metrics::{get_counter, get_distribution, get_gauge, get_set}; + use crate::transforms::test::create_topology; + + // ============================================================================ + // Helpers + // ============================================================================ + + fn make_hist(name: &str, samples: &[f64]) -> Metric { + let h = CoreHistogram::new(); + for s in samples { + h.record(*s); + } + Metric::new(name, MetricKind::Absolute, h.make_metric()) + } + + fn run_transform(config: HistSummConfig, events: Vec) -> Vec { + let mut transform = HistSumm::new(&config).expect("build"); + let mut out = OutputBuffer::with_capacity(events.len().max(1)); + for ev in events { + transform.transform(&mut out, ev); + } + out.into_events().collect() + } + + fn quantiles_of(metric: &Metric) -> &[Quantile] { + match metric.value() { + MetricValue::AggregatedSummary { quantiles, .. } => quantiles, + other => panic!("expected AggregatedSummary, got {other:?}"), + } + } + + fn build_err(toml_str: &str) -> String { + let config: HistSummConfig = toml::from_str(toml_str).unwrap(); + HistSumm::new(&config).unwrap_err().to_string() + } + + // ============================================================================ + // Generated config + // ============================================================================ + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + // ============================================================================ + // A. Config validation + // ============================================================================ + + #[test] + fn rejects_empty_exact_list() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = [] } + quantiles = [0.5] + "#, + )); + assert!(msg.contains("exact predicate has no names"), "{msg}"); + } + + #[test] + fn rejects_empty_exact_name() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = [""] } + quantiles = [0.5] + "#, + )); + assert!(msg.contains("empty name"), "{msg}"); + } + + #[test] + fn rejects_quantile_below_zero() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = ["x"] } + quantiles = [-0.1] + "#, + )); + assert!(msg.contains("outside [0.0, 1.0]"), "{msg}"); + } + + #[test] + fn rejects_quantile_above_one() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = ["x"] } + quantiles = [1.5] + "#, + )); + assert!(msg.contains("outside [0.0, 1.0]"), "{msg}"); + } + + #[test] + fn rejects_quantile_nan() { + // NaN cannot be expressed in TOML literally; build the config directly. + let config = HistSummConfig { + policies: vec![Policy { + target: HistPredicate::Exact { + names: vec!["x".into()], + }, + quantiles: vec![f64::NAN], + zero: ZeroBehavior::default(), + }], + admit_unmatched: true, + }; + let msg = HistSumm::new(&config).unwrap_err().to_string(); + assert!(msg.contains("outside [0.0, 1.0]"), "{msg}"); + } + + #[test] + fn rejects_invalid_regex() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { pattern = "[" } + quantiles = [0.5] + "#, + )); + assert!(msg.contains("invalid regex"), "{msg}"); + } + + #[test] + fn rejects_explicit_empty_quantiles() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = ["x"] } + quantiles = [] + "#, + )); + assert!(msg.contains("empty quantile list"), "{msg}"); + } + + #[test] + fn rejects_duplicate_exact_name_across_rules() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = ["foo"] } + quantiles = [0.5] + + [[policies]] + target = { names = ["bar", "foo"] } + quantiles = [0.9] + "#, + )); + assert!(msg.contains("duplicate exact name `foo`"), "{msg}"); + } + + #[test] + fn rejects_duplicate_exact_name_within_rule() { + let msg = build_err(indoc!( + r#" + [[policies]] + target = { names = ["x", "x"] } + quantiles = [0.5] + "#, + )); + assert!(msg.contains("duplicate exact name `x`"), "{msg}"); + } + + // ============================================================================ + // B. Alias equality + // ============================================================================ + + fn assert_eq_toml(a: &str, b: &str) { + let a: HistSummConfig = toml::from_str(a).expect("a parses"); + let b: HistSummConfig = toml::from_str(b).expect("b parses"); + assert_eq!(a, b); + } + + #[test] + fn alias_match_for_target() { + assert_eq_toml( + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + quantiles = [0.5] + "#, + ), + indoc!( + r#" + [[policies]] + match = { names = ["x"] } + quantiles = [0.5] + "#, + ), + ); + } + + #[test] + fn alias_qs_qtiles_for_quantiles() { + let canonical = indoc!( + r#" + [[policies]] + target = { names = ["x"] } + quantiles = [0.5, 0.95] + "#, + ); + for alias in ["qs", "qtiles"] { + let aliased = + format!("[[policies]]\ntarget = {{ names = [\"x\"] }}\n{alias} = [0.5, 0.95]\n"); + assert_eq_toml(canonical, &aliased); + } + } + + #[test] + fn alias_empty_for_zero() { + assert_eq_toml( + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + zero = "drop" + "#, + ), + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + empty = "drop" + "#, + ), + ); + } + + #[test] + fn alias_admit_unmatched() { + let canonical = "admit_unmatched = false\n"; + for alias in ["admit_unknown", "allow_unknown"] { + let aliased = format!("{alias} = false\n"); + assert_eq_toml(canonical, &aliased); + } + } + + #[test] + fn alias_emit_zero_for_zero_behavior() { + assert_eq_toml( + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + zero = "zero" + "#, + ), + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + zero = "emit_zero" + "#, + ), + ); + } + + #[test] + fn alias_emit_nan_for_zero_behavior() { + assert_eq_toml( + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + zero = "nan" + "#, + ), + indoc!( + r#" + [[policies]] + target = { names = ["x"] } + zero = "emit_nan" + "#, + ), + ); + } + + // ============================================================================ + // C. Quantile correctness + // ============================================================================ + + fn policy(target: HistPredicate, quantiles: Vec) -> Policy { + Policy { + target, + quantiles, + zero: ZeroBehavior::default(), + } + } + + fn exact(names: &[&str]) -> HistPredicate { + HistPredicate::Exact { + names: names.iter().map(|s| (*s).to_string()).collect(), + } + } + + fn pattern(p: &str) -> HistPredicate { + HistPredicate::Pattern { pattern: p.into() } + } + + fn config_with(rules: Vec, admit_unmatched: bool) -> HistSummConfig { + HistSummConfig { + policies: rules, + admit_unmatched, + } + } + + #[test] + fn quantile_single_bucket() { + // 100 samples all in the (0.5, 1.0] bucket (upper_limit = 1.0). p50 target rank + // is 50, which lands at the bucket midpoint: 0.5 + 0.5 * 0.5 = 0.75. + let samples = vec![0.6; 100]; + let hist = make_hist("lat", &samples); + let cfg = config_with(vec![policy(exact(&["lat"]), vec![0.5])], false); + let out = run_transform(cfg, vec![hist.into()]); + assert_eq!(out.len(), 1); + let summary = out[0].as_metric(); + let qs = quantiles_of(summary); + assert_eq!(qs.len(), 1); + assert!((qs[0].value - 0.75).abs() < 1e-9, "value = {}", qs[0].value); + } + + #[test] + fn quantile_preserves_count_and_sum() { + let samples: Vec = (1..=10).map(|v| f64::from(v) * 0.25).collect(); + let expected_sum: f64 = samples.iter().sum(); + let expected_count = samples.len() as u64; + + let hist = make_hist("lat", &samples); + let cfg = config_with(vec![policy(exact(&["lat"]), vec![0.5, 0.9])], false); + let out = run_transform(cfg, vec![hist.into()]); + let summary = out[0].as_metric(); + match summary.value() { + MetricValue::AggregatedSummary { count, sum, .. } => { + assert_eq!(*count, expected_count); + assert!((*sum - expected_sum).abs() < 1e-9); + } + _ => panic!("not a summary"), + } + } + + #[test] + fn quantile_p100_falls_back_for_inf_bucket() { + // 99 small samples plus 1 enormous sample landing in +Inf. + let mut samples = vec![0.6; 99]; + samples.push(1.0e30); + let hist = make_hist("lat", &samples); + let cfg = config_with(vec![policy(exact(&["lat"]), vec![1.0])], false); + let out = run_transform(cfg, vec![hist.into()]); + let qs = quantiles_of(out[0].as_metric()); + // p100 lands in the +Inf bucket, so we fall back to the previous finite upper. + assert!(qs[0].value.is_finite(), "value = {}", qs[0].value); + assert!(qs[0].value > 0.0); + } + + #[test] + fn quantile_multi_bucket() { + // 1000 samples spread across 7 of the 20 vector-core buckets. The cumulative + // counts (100, 300, 500, 700, 900, 990, 1000) are chosen so each target rank + // lands exactly on a bucket boundary: p50=500 -> end of bucket 6, p90=900 -> end + // of bucket 8, p99=990 -> end of bucket 9. Interpolation therefore collapses to + // the bucket's upper limit, giving exact f64 results. + // + // bucket index | bucket range | sample value | count + // -------------+-----------------+--------------+------ + // 4 | (0.125, 0.25 ] | 0.2 | 100 + // 5 | (0.25 , 0.5 ] | 0.3 | 200 + // 6 | (0.5 , 1.0 ] | 0.6 | 200 + // 7 | (1.0 , 2.0 ] | 1.5 | 200 + // 8 | (2.0 , 4.0 ] | 3.0 | 200 + // 9 | (4.0 , 8.0 ] | 5.0 | 90 + // 10 | (8.0 , 16.0 ] | 10.0 | 10 + let samples: Vec = [ + (0.2_f64, 100usize), + (0.3, 200), + (0.6, 200), + (1.5, 200), + (3.0, 200), + (5.0, 90), + (10.0, 10), + ] + .iter() + .flat_map(|&(v, n)| std::iter::repeat(v).take(n)) + .collect(); + + let hist = make_hist("lat", &samples); + let cfg = config_with( + vec![policy(exact(&["lat"]), vec![0.5, 0.9, 0.99])], + false, + ); + let out = run_transform(cfg, vec![hist.into()]); + let qs = quantiles_of(out[0].as_metric()); + + assert_eq!(qs.len(), 3); + assert_eq!(qs[0].quantile, 0.5); + assert_eq!(qs[0].value, 1.0); + assert_eq!(qs[1].quantile, 0.9); + assert_eq!(qs[1].value, 4.0); + assert_eq!(qs[2].quantile, 0.99); + assert_eq!(qs[2].value, 8.0); + } + + #[test] + fn quantile_two_bucket_split() { + // 80 small samples in (0.5, 1.0] and 20 larger samples in (1.0, 2.0]. + let mut samples = vec![0.6; 80]; + samples.extend(std::iter::repeat(1.5).take(20)); + let hist = make_hist("lat", &samples); + let cfg = config_with(vec![policy(exact(&["lat"]), vec![0.5, 0.9])], false); + let out = run_transform(cfg, vec![hist.into()]); + let qs = quantiles_of(out[0].as_metric()); + assert_eq!(qs.len(), 2); + assert!( + qs[0].value < qs[1].value, + "p50 {} >= p90 {}", + qs[0].value, + qs[1].value + ); + assert!(qs[0].value > 0.5 && qs[0].value <= 1.0); + assert!(qs[1].value > 1.0 && qs[1].value <= 2.0); + } + + #[test] + fn zero_behavior_drop_emits_nothing() { + let hist = make_hist("lat", &[]); + let cfg = config_with( + vec![Policy { + target: exact(&["lat"]), + quantiles: vec![0.5], + zero: ZeroBehavior::Drop, + }], + false, + ); + let out = run_transform(cfg, vec![hist.into()]); + assert!(out.is_empty(), "expected no events, got {}", out.len()); + } + + #[test] + fn zero_behavior_zero_emits_zero_values() { + let hist = make_hist("lat", &[]); + let cfg = config_with( + vec![Policy { + target: exact(&["lat"]), + quantiles: vec![0.5, 0.99], + zero: ZeroBehavior::Zero, + }], + false, + ); + let out = run_transform(cfg, vec![hist.into()]); + let summary = out[0].as_metric(); + match summary.value() { + MetricValue::AggregatedSummary { + quantiles, + count, + sum, + } => { + assert_eq!(*count, 0); + assert_eq!(*sum, 0.0); + assert!(quantiles.iter().all(|q| q.value == 0.0)); + } + _ => panic!("not a summary"), + } + } + + #[test] + fn zero_behavior_nan_emits_nan_values() { + let hist = make_hist("lat", &[]); + let cfg = config_with( + vec![Policy { + target: exact(&["lat"]), + quantiles: vec![0.5, 0.99], + zero: ZeroBehavior::Nan, + }], + false, + ); + let out = run_transform(cfg, vec![hist.into()]); + let summary = out[0].as_metric(); + match summary.value() { + MetricValue::AggregatedSummary { + quantiles, + count, + sum, + } => { + assert_eq!(*count, 0); + assert_eq!(*sum, 0.0); + assert!(quantiles.iter().all(|q| q.value.is_nan())); + } + _ => panic!("not a summary"), + } + } + + // ============================================================================ + // D. Precedence + // ============================================================================ + + #[test] + fn exact_wins_over_pattern() { + // Pattern matches anything; Exact rule is declared second but must still win. + let cfg = config_with( + vec![ + policy(pattern(".*"), vec![0.5]), + policy(exact(&["lat"]), vec![0.9, 0.99]), + ], + false, + ); + let hist = make_hist("lat", &[0.6; 50]); + let out = run_transform(cfg, vec![hist.into()]); + let qs = quantiles_of(out[0].as_metric()); + // Exact has two quantiles; pattern has only one. Quantile count proves which rule fired. + assert_eq!(qs.len(), 2); + assert_eq!(qs[0].quantile, 0.9); + assert_eq!(qs[1].quantile, 0.99); + } + + #[test] + fn first_pattern_wins() { + let cfg = config_with( + vec![ + policy(pattern("^l"), vec![0.5]), + policy(pattern("^lat"), vec![0.9, 0.99]), + ], + false, + ); + let hist = make_hist("lat", &[0.6; 50]); + let out = run_transform(cfg, vec![hist.into()]); + let qs = quantiles_of(out[0].as_metric()); + assert_eq!(qs.len(), 1); + assert_eq!(qs[0].quantile, 0.5); + } + + // ============================================================================ + // E. Unmatched + non-histogram passthrough + // ============================================================================ + + #[test] + fn unmatched_admitted_passes_through_unchanged() { + let cfg = config_with(vec![policy(exact(&["other"]), vec![0.5])], true); + let hist = make_hist("lat", &[0.6; 5]); + let original_value = hist.value().clone(); + let out = run_transform(cfg, vec![hist.into()]); + assert_eq!(out.len(), 1); + assert_eq!(out[0].as_metric().value(), &original_value); + } + + #[test] + fn unmatched_not_admitted_is_dropped() { + let cfg = config_with(vec![policy(exact(&["other"]), vec![0.5])], false); + let hist = make_hist("lat", &[0.6; 5]); + let out = run_transform(cfg, vec![hist.into()]); + assert!(out.is_empty()); + } + + #[test] + fn non_histogram_metrics_pass_through() { + // Exhaustive over every `MetricValue` variant except AggregatedHistogram. + let summary = Metric::new( + "summary", + MetricKind::Absolute, + MetricValue::AggregatedSummary { + quantiles: vec![Quantile { + quantile: 0.5, + value: 1.0, + }], + count: 1, + sum: 1.0, + }, + ); + let sketch = Metric::new( + "sketch", + MetricKind::Absolute, + MetricValue::from(vector_lib::metrics::AgentDDSketch::with_agent_defaults()), + ); + + let cfg = config_with(Vec::new(), false); + let inputs: Vec = vec![ + get_counter(1.0, MetricKind::Absolute).into(), + get_gauge(1.0, MetricKind::Absolute).into(), + get_set(vec!["a"], MetricKind::Absolute).into(), + get_distribution(vec![1.0_f64], MetricKind::Absolute).into(), + summary.into(), + sketch.into(), + ]; + let expected: Vec<_> = inputs + .iter() + .map(|e| e.as_metric().value().clone()) + .collect(); + let out = run_transform(cfg, inputs); + assert_eq!(out.len(), expected.len()); + for (event, expected_value) in out.iter().zip(expected.iter()) { + assert_eq!(event.as_metric().value(), expected_value); + } + } + + // ============================================================================ + // F. Topology-level smoke + // ============================================================================ + + #[tokio::test] + async fn topology_passthrough_for_unmatched() { + let config = toml::from_str::(indoc!( + r#" + admit_unmatched = true + + [[policies]] + target = { names = ["matched"] } + quantiles = [0.5] + "#, + )) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let hist: Event = make_hist("unmatched", &[0.6; 10]).into(); + tx.send(hist).await.unwrap(); + + let received = out.recv().await.unwrap(); + assert!(matches!( + received.as_metric().value(), + MetricValue::AggregatedHistogram { .. } + )); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn topology_converts_matched_histogram() { + let config = toml::from_str::(indoc!( + r#" + admit_unmatched = false + + [[policies]] + target = { names = ["matched"] } + quantiles = [0.5, 0.99] + "#, + )) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let hist: Event = make_hist("matched", &[0.6; 100]).into(); + tx.send(hist).await.unwrap(); + + let received = out.recv().await.unwrap(); + assert!(matches!( + received.as_metric().value(), + MetricValue::AggregatedSummary { .. } + )); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } + +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index a0037b5cd4..fdd2f7fe0a 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -15,6 +15,8 @@ pub mod aws_ec2_metadata; mod exclusive_route; #[cfg(feature = "transforms-filter")] pub mod filter; +#[cfg(feature = "transforms-hist_summ")] +pub mod hist_summ; #[cfg(feature = "transforms-log_to_metric")] pub mod log_to_metric; #[cfg(feature = "transforms-lua")] From 8793adb91a03c771639e92b50b923d78916284e2 Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Wed, 17 Jun 2026 19:08:44 +0530 Subject: [PATCH 9/9] Feat[OBE-10264] - Augument top-level test to use multiple policies --- src/transforms/hist_summ.rs | 48 ++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/src/transforms/hist_summ.rs b/src/transforms/hist_summ.rs index d8df38a9e1..b228d4fb36 100644 --- a/src/transforms/hist_summ.rs +++ b/src/transforms/hist_summ.rs @@ -1044,13 +1044,20 @@ mod tests { #[tokio::test] async fn topology_converts_matched_histogram() { + // Two policies fan in: one exact-name, one regex. Each emits a summary with a + // distinct quantile count so we can tell which policy fired. A third histogram + // matches neither and (with admit_unmatched = false) must be dropped. let config = toml::from_str::(indoc!( r#" admit_unmatched = false [[policies]] - target = { names = ["matched"] } - quantiles = [0.5, 0.99] + target = { names = ["http_request_duration_seconds"] } + quantiles = [0.5] + + [[policies]] + target = { pattern = "^db_.*_seconds$" } + quantiles = [0.9, 0.99] "#, )) .unwrap(); @@ -1059,14 +1066,39 @@ mod tests { let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; - let hist: Event = make_hist("matched", &[0.6; 100]).into(); - tx.send(hist).await.unwrap(); + // Exact-name policy fires. + let exact_hist: Event = + make_hist("http_request_duration_seconds", &[0.6; 100]).into(); + tx.send(exact_hist).await.unwrap(); + let received = out.recv().await.unwrap(); + let metric = received.as_metric(); + assert_eq!(metric.name(), "http_request_duration_seconds"); + match metric.value() { + MetricValue::AggregatedSummary { quantiles, .. } => { + assert_eq!(quantiles.len(), 1); + assert_eq!(quantiles[0].quantile, 0.5); + } + other => panic!("expected AggregatedSummary, got {other:?}"), + } + // Regex policy fires. + let pattern_hist: Event = make_hist("db_query_duration_seconds", &[0.6; 100]).into(); + tx.send(pattern_hist).await.unwrap(); let received = out.recv().await.unwrap(); - assert!(matches!( - received.as_metric().value(), - MetricValue::AggregatedSummary { .. } - )); + let metric = received.as_metric(); + assert_eq!(metric.name(), "db_query_duration_seconds"); + match metric.value() { + MetricValue::AggregatedSummary { quantiles, .. } => { + assert_eq!(quantiles.len(), 2); + assert_eq!(quantiles[0].quantile, 0.9); + assert_eq!(quantiles[1].quantile, 0.99); + } + other => panic!("expected AggregatedSummary, got {other:?}"), + } + + // No policy matches, admit_unmatched = false -> dropped, no output. + let unmatched_hist: Event = make_hist("memcache_hit_rate", &[0.6; 100]).into(); + tx.send(unmatched_hist).await.unwrap(); drop(tx); topology.stop().await;