Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions lib/vector-buffers/examples/buffer_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use vector_buffers::{
builder::TopologyBuilder,
channel::{BufferReceiver, BufferSender},
},
BufferType, Bufferable, EventCount, WhenFull,
BufferType, EventCount, TimedBufferable, WhenFull,
};
use vector_common::byte_size_of::ByteSizeOf;
use vector_common::finalization::{
Expand Down Expand Up @@ -244,7 +244,7 @@ fn generate_record_cache(min: usize, max: usize) -> Vec<VariableMessage> {

async fn generate_buffer<T>(buffer_type: &str) -> (BufferSender<T>, BufferReceiver<T>)
where
T: Bufferable + Clone + Finalizable,
T: TimedBufferable + Finalizable,
{
let data_dir = PathBuf::from("/tmp/vector");
let id = format!("{}-buffer-perf-testing", buffer_type);
Expand Down
34 changes: 28 additions & 6 deletions lib/vector-buffers/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
channel::{BufferReceiver, BufferSender},
},
variants::{DiskV2Buffer, MemoryBuffer},
Bufferable, WhenFull,
WhenFull,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -274,7 +274,7 @@ impl BufferType {
id: String,
) -> Result<(), BufferBuildError>
where
T: Bufferable + Clone + Finalizable,
T: crate::TimedBufferable + Finalizable,
{
match *self {
BufferType::Memory {
Expand Down Expand Up @@ -366,20 +366,42 @@ impl BufferConfig {
pub async fn build<T>(
&self,
data_dir: Option<PathBuf>,
buffer_id: String,
id: impl Into<vector_common::config::ComponentKey>,
span: Span,
) -> Result<(BufferSender<T>, BufferReceiver<T>), BufferBuildError>
where
T: Bufferable + Clone + Finalizable,
T: crate::TimedBufferable + Finalizable,
{
self.build_with_clock(data_dir, id, span, std::sync::Arc::new(crate::SystemClock))
.await
}

/// Like [`build`](Self::build) but accepts an explicit clock, primarily for tests that want
/// deterministic queue-delay measurements.
///
/// # Errors
///
/// Same as [`build`](Self::build).
#[allow(clippy::needless_pass_by_value)]
pub async fn build_with_clock<T>(
&self,
data_dir: Option<PathBuf>,
id: impl Into<vector_common::config::ComponentKey>,
span: Span,
clock: std::sync::Arc<dyn crate::Clock>,
) -> Result<(BufferSender<T>, BufferReceiver<T>), BufferBuildError>
where
T: crate::TimedBufferable + Finalizable,
{
let component_key = id.into();
let mut builder = TopologyBuilder::default();

for stage in self.stages() {
stage.add_to_builder(&mut builder, data_dir.clone(), buffer_id.clone())?;
stage.add_to_builder(&mut builder, data_dir.clone(), component_key.id().to_string())?;
}

builder
.build(buffer_id, span)
.build_with_clock(component_key, span, clock)
.await
.context(FailedToBuildTopologySnafu)
}
Expand Down
12 changes: 12 additions & 0 deletions lib/vector-buffers/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,15 @@ registered_event! {
self.send_duration.record(duration);
}
}

registered_event! {
BufferQueueDelay {
stage: usize,
} => {
queue_delay: Histogram = histogram!("topology_queue_delay_seconds", "stage" => self.stage.to_string()),
}

fn emit(&self, duration: Duration) {
self.queue_delay.record(duration);
}
}
3 changes: 3 additions & 0 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ mod internal_events;

#[cfg(test)]
pub mod test;
mod timed;
pub mod topology;

pub use timed::{Clock, SystemClock, Timed, TimedBufferable, TimedEncodable};

pub(crate) mod variants;

use std::fmt::Debug;
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-buffers/src/test/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
channel::{BufferReceiver, BufferSender},
},
variants::{DiskV2Buffer, MemoryBuffer},
Bufferable, WhenFull,
TimedBufferable, WhenFull,
};

#[cfg(test)]
Expand Down Expand Up @@ -45,7 +45,7 @@ pub enum Variant {
impl Variant {
pub async fn create_sender_receiver<T>(&self) -> (BufferSender<T>, BufferReceiver<T>)
where
T: Bufferable + Clone + Finalizable,
T: TimedBufferable + Finalizable,
{
let mut builder = TopologyBuilder::default();
match self {
Expand Down
221 changes: 221 additions & 0 deletions lib/vector-buffers/src/timed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
//! Wrapper carrying an enqueue timestamp alongside an item moving through a buffer.

use std::time::{Duration, SystemTime};

use bytes::{Buf, BufMut};
use vector_common::{
byte_size_of::ByteSizeOf,
finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
};

use crate::{
encoding::{Encodable, FixedEncodable},
EventCount,
};

/// Source of wall-clock time. Real implementations return `SystemTime::now()`; tests can supply
/// a controllable mock so that elapsed-time assertions are deterministic.
pub trait Clock: Send + Sync + 'static {
fn now(&self) -> SystemTime;
}

#[derive(Debug, Default, Clone, Copy)]
pub struct SystemClock;

impl Clock for SystemClock {
fn now(&self) -> SystemTime {
SystemTime::now()
}
}

/// A bufferable item paired with the wall-clock time at which it entered a buffer.
///
/// `enq_tm == None` denotes "unknown enqueue time" — typically a record decoded from a buffer
/// written by a Vector binary that predates this field. Receivers treat that as zero delay.
#[derive(Debug, Clone)]
pub struct Timed<T> {
pub inner: T,
pub enq_tm: Option<SystemTime>,
}

impl<T> Timed<T> {
pub fn stamped(inner: T, clock: &dyn Clock) -> Self {
Self {
inner,
enq_tm: Some(clock.now()),
}
}

pub fn untimed(inner: T) -> Self {
Self {
inner,
enq_tm: None,
}
}

/// Duration since `enq_tm` measured against `clock`. Returns `Duration::ZERO` when the
/// timestamp is missing or when the clock has moved backwards (e.g. NTP correction).
pub fn elapsed(&self, clock: &dyn Clock) -> Duration {
match self.enq_tm {
None => Duration::ZERO,
Some(t) => clock.now().duration_since(t).unwrap_or(Duration::ZERO),
}
}

pub fn into_inner(self) -> T {
self.inner
}
}

impl<T: ByteSizeOf> ByteSizeOf for Timed<T> {
fn allocated_bytes(&self) -> usize {
self.inner.allocated_bytes()
}
}

impl<T: EventCount> EventCount for Timed<T> {
fn event_count(&self) -> usize {
self.inner.event_count()
}
}

impl<T: AddBatchNotifier> AddBatchNotifier for Timed<T> {
fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
self.inner.add_batch_notifier(notifier);
}
}

impl<T: Finalizable> Finalizable for Timed<T> {
fn take_finalizers(&mut self) -> EventFinalizers {
self.inner.take_finalizers()
}
}

pub trait TimedEncodable: Encodable {
fn encode_with_enq_tm<B: BufMut>(
self,
enq_tm: Option<SystemTime>,
buffer: &mut B,
) -> Result<(), Self::EncodeError>;

fn decode_with_enq_tm<B: Buf + Clone>(
metadata: Self::Metadata,
buffer: B,
) -> Result<(Self, Option<SystemTime>), Self::DecodeError>;
}

impl<T: FixedEncodable> TimedEncodable for T {
fn encode_with_enq_tm<B: BufMut>(
self,
_enq_tm: Option<SystemTime>,
buffer: &mut B,
) -> Result<(), Self::EncodeError> {
<Self as Encodable>::encode(self, buffer)
}

fn decode_with_enq_tm<B: Buf + Clone>(
metadata: Self::Metadata,
buffer: B,
) -> Result<(Self, Option<SystemTime>), Self::DecodeError> {
<Self as Encodable>::decode(metadata, buffer).map(|v| (v, None))
}
}

pub trait TimedBufferable: TimedEncodable + crate::InMemoryBufferable + Clone {}
impl<T> TimedBufferable for T where T: TimedEncodable + crate::InMemoryBufferable + Clone {}

impl<T: TimedEncodable> Encodable for Timed<T> {
type Metadata = T::Metadata;
type EncodeError = T::EncodeError;
type DecodeError = T::DecodeError;

fn get_metadata() -> Self::Metadata {
T::get_metadata()
}

fn can_decode(metadata: Self::Metadata) -> bool {
T::can_decode(metadata)
}

fn encode<B: BufMut>(self, buffer: &mut B) -> Result<(), Self::EncodeError> {
self.inner.encode_with_enq_tm(self.enq_tm, buffer)
}

fn decode<B: Buf + Clone>(
metadata: Self::Metadata,
buffer: B,
) -> Result<Self, Self::DecodeError> {
T::decode_with_enq_tm(metadata, buffer).map(|(inner, enq_tm)| Timed { inner, enq_tm })
}
}

#[cfg(test)]
mod tests {
use std::sync::Mutex;

use super::*;

/// Test clock with explicit set/advance for deterministic elapsed-time assertions.
pub(crate) struct MockClock(Mutex<SystemTime>);

impl MockClock {
pub fn new(t: SystemTime) -> Self {
Self(Mutex::new(t))
}

pub fn advance(&self, by: Duration) {
let mut g = self.0.lock().unwrap();
*g += by;
}

pub fn set(&self, t: SystemTime) {
*self.0.lock().unwrap() = t;
}
}

impl Clock for MockClock {
fn now(&self) -> SystemTime {
*self.0.lock().unwrap()
}
}

#[test]
fn stamped_records_clock_now() {
let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let clock = MockClock::new(t0);
let timed = Timed::stamped(42_u64, &clock);
assert_eq!(timed.enq_tm, Some(t0));
assert_eq!(timed.inner, 42);
}

#[test]
fn untimed_has_no_timestamp() {
let timed = Timed::untimed(42_u64);
assert_eq!(timed.enq_tm, None);
}

#[test]
fn elapsed_reads_delta_from_clock() {
let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let clock = MockClock::new(t0);
let timed = Timed::stamped(0_u64, &clock);
clock.advance(Duration::from_millis(250));
assert_eq!(timed.elapsed(&clock), Duration::from_millis(250));
}

#[test]
fn elapsed_is_zero_when_clock_moved_backwards() {
let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let clock = MockClock::new(t0);
let timed = Timed::stamped(0_u64, &clock);
clock.set(t0 - Duration::from_secs(60));
assert_eq!(timed.elapsed(&clock), Duration::ZERO);
}

#[test]
fn elapsed_is_zero_when_enq_tm_is_none() {
let clock = MockClock::new(SystemTime::UNIX_EPOCH);
let timed: Timed<u64> = Timed::untimed(0);
assert_eq!(timed.elapsed(&clock), Duration::ZERO);
}
}
Loading