From 91e216b1956bd3b5047259adc82e8bab2c6776d6 Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 12:08:30 +0530 Subject: [PATCH 1/8] OBE-10327: rejection reports for splunk hec / in general --- src/sinks/elasticsearch/config.rs | 5 +- src/sinks/elasticsearch/mod.rs | 32 +------ src/sinks/elasticsearch/service.rs | 114 ++++++++++--------------- src/sinks/splunk_hec/common/service.rs | 80 ++++++++++++++++- src/sinks/splunk_hec/logs/config.rs | 22 ++++- src/sinks/splunk_hec/logs/tests.rs | 11 ++- src/sinks/splunk_hec/metrics/config.rs | 21 ++++- src/sinks/splunk_hec/metrics/tests.rs | 3 +- src/sinks/util/mod.rs | 2 + src/sinks/util/rejection_report.rs | 111 ++++++++++++++++++++++++ 10 files changed, 292 insertions(+), 109 deletions(-) create mode 100644 src/sinks/util/rejection_report.rs diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 9dc1a9282..b9f48ef00 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -16,7 +16,7 @@ use crate::{ elasticsearch::{ health::ElasticsearchHealthLogic, retry::ElasticsearchRetryLogic, - service::{ElasticsearchService, HttpRequestBuilder, Telemetry}, + service::{ElasticsearchRejectionContext, ElasticsearchService, HttpRequestBuilder, Telemetry}, sink::ElasticsearchSink, ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon, ElasticsearchCommonMode, ElasticsearchMode, RejectionReport, VersionType, @@ -573,6 +573,7 @@ impl SinkConfig for ElasticsearchConfig { "end_pt" => endpt_str, ), }; + let context = std::sync::Arc::new(ElasticsearchRejectionContext { telemetry }); let http_request_builder = HttpRequestBuilder::new(&common, self); let service = ElasticsearchService::new( @@ -580,7 +581,7 @@ impl SinkConfig for ElasticsearchConfig { http_request_builder, self.rejection_report.clone(), self.compression.clone(), - telemetry); + context); (endpoint, service) }) diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index 0ae67d00c..dfa2977c9 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -77,37 +77,7 @@ impl Default for ElasticsearchMode { } } -/// Elasticsearch rejection reporting mode. -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -#[serde(deny_unknown_fields, rename_all = "snake_case")] -pub enum RejectionReport { - /// Do not print rejection response, only increment stats - #[serde(alias = "normal")] - Stats, - - /// Report entire response, the response may be large and likely not too useful - Response, - - /// Report entire request and response may be huge, with smaller batch-sizes this - /// can be useful to debug issues - RequestResponse, -} - -impl Default for RejectionReport { - fn default() -> Self { - Self::Stats - } -} - -impl RejectionReport { - pub fn needs_request(&self) -> bool { - match self { - Self::RequestResponse => true, - _ => false, - } - } -} +pub use crate::sinks::util::RejectionReport; /// Bulk API actions. #[configurable_component] diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index 377805b57..7155bff0b 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -24,7 +24,7 @@ use crate::{ sinks::util::{ auth::Auth, http::{HttpBatchService, RequestConfig}, - Compression, ElementCount, Decompressor, + Compression, ElementCount, emit_rejection_error, RejectionContext, }, }; @@ -77,6 +77,30 @@ pub struct Telemetry { pub indexed: Counter, } +pub struct ElasticsearchRejectionContext { + pub telemetry: Telemetry, +} + +impl RejectionContext for ElasticsearchRejectionContext { + fn log_category(&self) -> &'static str { + "es_rej_rpt" + } + + fn error_code(&self, status: u16) -> String { + format!("http_response_{status}") + } + + fn error_message(&self, status: u16, body: &Bytes) -> String { + err_summary(status, body).msg + } + + fn record_rejection(&self, status: u16, body: &Bytes) { + let s = err_summary(status, body); + self.telemetry.rejected.increment(s.rejected); + self.telemetry.indexed.increment(s.indexed); + } +} + #[derive(Clone)] pub struct ElasticsearchService { // TODO: `HttpBatchService` has been deprecated for direct use in sinks. @@ -88,7 +112,7 @@ pub struct ElasticsearchService { >, rej_rpt: RejectionReport, compression: Compression, - telemetry: Telemetry, + context: Arc, } impl ElasticsearchService { @@ -97,7 +121,7 @@ impl ElasticsearchService { http_request_builder: HttpRequestBuilder, rej_rpt: RejectionReport, compression: Compression, - telemetry: Telemetry, + context: Arc, ) -> ElasticsearchService { let http_request_builder = Arc::new(http_request_builder); let batch_service = HttpBatchService::new(http_client, move |req| { @@ -106,7 +130,7 @@ impl ElasticsearchService { Box::pin(async move { request_builder.build_request(req).await }); future }); - ElasticsearchService { batch_service, rej_rpt, compression, telemetry } + ElasticsearchService { batch_service, rej_rpt, compression, context } } } @@ -213,14 +237,14 @@ impl Service for ElasticsearchService { } else { None }; - let telemetry = self.telemetry.clone(); + let context = Arc::clone(&self.context); Box::pin(async move { http_service.ready().await?; let events_byte_size = std::mem::take(req.metadata_mut()).into_events_estimated_json_encoded_byte_size(); let http_response = http_service.call(req).await?; - let event_status = get_event_status(&http_response, req_for_rpt, rej_rpt, telemetry); + let event_status = get_event_status(&http_response, req_for_rpt, rej_rpt, &context); Ok(ElasticsearchResponse { event_status, http_response, @@ -230,8 +254,6 @@ impl Service for ElasticsearchService { } } -const ES_REJ_RPT: &str = "es_rej_rpt"; - fn response_frag(key: &str, val_prefix: &str) -> String { format!("\"{key}\":{val_prefix}") } @@ -244,97 +266,55 @@ struct ErrSummary { rejected: u64, } -fn err_summary(response: &Response) -> ErrSummary { - let body = String::from_utf8_lossy(response.body()); - let i = - body +fn err_summary(status: u16, body: &Bytes) -> ErrSummary { + let body_str = String::from_utf8_lossy(body); + let i: u64 = + body_str .match_indices(response_frag("status", "201").as_str()) .count() .try_into() .unwrap(); - let r = - body + let r: u64 = + body_str .match_indices(response_frag("status", "400").as_str()) .count() .try_into() .unwrap(); ErrSummary { - error_code: format!("http_response_{}", response.status().as_u16()), + error_code: format!("http_response_{status}"), msg: format!("Request contained errors (indexed: {i}, rejected: {r})."), indexed: i, rejected: r } } -fn emit_bad_response_error( - response: &Response, - request: Option<(ElasticsearchRequest, Compression)>, - rej_rpt: RejectionReport, - telemetry: Telemetry, -) { - let err_summary = err_summary(response); - telemetry.indexed.increment(err_summary.indexed); - telemetry.rejected.increment(err_summary.rejected); - - match (rej_rpt, request) { - (RejectionReport::RequestResponse, Some((req, comp))) => { - let decomp = Decompressor::from(comp); - let req_data = match decomp.decompress(req.payload) { - Ok(data) => data, - Err(err) => format!("- decompression failed({comp}): '{err}' -").into() - }; - - error!( - category = ES_REJ_RPT, - message = err_summary.msg, - error_code = err_summary.error_code, - response = ?response, - request = %String::from_utf8_lossy(&req_data), - ); - } - (RejectionReport::Stats, _) => { - error!( - category = ES_REJ_RPT, - message = err_summary.msg, - error_code = err_summary.error_code, - ); - } - _ => { - error!( - category = ES_REJ_RPT, - message = err_summary.msg, - error_code = err_summary.error_code, - response = ?response, - ); - } - }; -} - fn get_event_status( response: &Response, request: Option<(ElasticsearchRequest, Compression)>, rej_rpt: RejectionReport, - telemetry: Telemetry, + context: &ElasticsearchRejectionContext, ) -> EventStatus { let status = response.status(); if status.is_success() { - let body = String::from_utf8_lossy(response.body()); - if body.contains(response_frag("errors", "true").as_str()) { - emit_bad_response_error(response, request, rej_rpt, telemetry); + let body = response.body(); + if String::from_utf8_lossy(body).contains(response_frag("errors", "true").as_str()) { + let req = request.map(|(req, comp)| (req.payload, comp)); + emit_rejection_error(context, status.as_u16(), body, req, rej_rpt); EventStatus::Rejected } else { EventStatus::Delivered } } else if status.is_server_error() { - let rej_rpt = if rej_rpt == RejectionReport::RequestResponse { + let mode = if rej_rpt == RejectionReport::RequestResponse { RejectionReport::Response } else { rej_rpt }; - emit_bad_response_error(response, None, rej_rpt, telemetry); + emit_rejection_error(context, status.as_u16(), response.body(), None, mode); EventStatus::Errored } else { - emit_bad_response_error(response, request, rej_rpt, telemetry); + let req = request.map(|(req, comp)| (req.payload, comp)); + emit_rejection_error(context, status.as_u16(), response.body(), req, rej_rpt); EventStatus::Rejected } } @@ -366,7 +346,7 @@ mod tests { assert!(body.contains(response_frag("errors", "true").as_str())); assert_eq!( - err_summary(&res), + err_summary(res.status().as_u16(), res.body()), ErrSummary { error_code: "http_response_200".into(), msg: "Request contained errors (indexed: 259, rejected: 5).".to_string(), diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index dc22cb884..dd8dc5ba2 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -7,6 +7,7 @@ use bytes::Bytes; use futures_util::future::BoxFuture; use http::{HeaderName, Request, HeaderValue}; use indexmap::IndexMap; +use metrics::Counter; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; @@ -25,16 +26,43 @@ use crate::{ internal_events::{SplunkIndexerAcknowledgementUnavailableError, SplunkResponseParseError}, sinks::{ splunk_hec::common::{build_uri, request::HecRequest, response::HecResponse}, - util::{sink::Response, Compression}, + util::{emit_rejection_error, sink::Response, Compression, RejectionContext, RejectionReport}, UriParseSnafu, }, }; +// #OBSERVO_STYLE_TELEMETRY# — see ElasticsearchService for rationale. +#[derive(Clone)] +pub struct Telemetry { + pub rejected: Counter, +} + +pub struct HecRejectionContext { + pub telemetry: Telemetry, +} + +impl RejectionContext for HecRejectionContext { + fn log_category(&self) -> &'static str { + "hec_rej_rpt" + } + + fn error_message(&self, status: u16, _body: &Bytes) -> String { + format!("Request rejected (status: {status}).") + } + + fn record_rejection(&self, _status: u16, _body: &Bytes) { + self.telemetry.rejected.increment(1); + } +} + pub struct HecService { pub inner: S, ack_finalizer_tx: Option)>>, ack_slots: PollSemaphore, current_ack_slot: Option, + rej_rpt: RejectionReport, + compression: Compression, + context: Arc, } #[derive(Deserialize, Serialize, Debug)] @@ -55,6 +83,9 @@ where ack_client: Option, http_request_builder: Arc, indexer_acknowledgements: HecClientAcknowledgementsConfig, + rej_rpt: RejectionReport, + compression: Compression, + context: Arc, ) -> Self { let max_pending_acks = indexer_acknowledgements.max_pending_acks.get(); let tx = if let Some(ack_client) = ack_client { @@ -76,6 +107,9 @@ where ack_finalizer_tx: tx, ack_slots, current_ack_slot: None, + rej_rpt, + compression, + context, } } } @@ -112,6 +146,14 @@ where fn call(&mut self, mut req: HecRequest) -> Self::Future { let ack_finalizer_tx = self.ack_finalizer_tx.clone(); let ack_slot = self.current_ack_slot.take(); + let rej_rpt = self.rej_rpt.clone(); + let compression = self.compression; + let context = Arc::clone(&self.context); + let req_for_rpt = if rej_rpt.needs_request() { + Some((req.body.clone(), compression)) + } else { + None + }; let metadata = std::mem::take(req.metadata_mut()); let events_count = metadata.event_count(); @@ -154,8 +196,15 @@ where EventStatus::Delivered } } else if response.is_transient() { + let mode = if rej_rpt == RejectionReport::RequestResponse { + RejectionReport::Response + } else { + rej_rpt + }; + emit_rejection_error(&*context, response.status_code(), response.body(), None, mode); EventStatus::Errored } else { + emit_rejection_error(&*context, response.status_code(), response.body(), req_for_rpt, rej_rpt); EventStatus::Rejected }; @@ -170,12 +219,17 @@ where pub trait ResponseExt { fn body(&self) -> &Bytes; + fn status_code(&self) -> u16; } impl ResponseExt for http::Response { fn body(&self) -> &Bytes { self.body() } + + fn status_code(&self) -> u16 { + self.status().as_u16() + } } pub struct HttpRequestBuilder { @@ -322,16 +376,24 @@ mod tests { }, build_http_batch_service, request::HecRequest, - service::{HecAckResponseBody, HecService, HttpRequestBuilder}, + service::{HecAckResponseBody, HecRejectionContext, HecService, HttpRequestBuilder, Telemetry}, EndpointTarget, }, - util::{metadata::RequestMetadataBuilder, Compression}, + util::{metadata::RequestMetadataBuilder, Compression, RejectionReport}, }, }; const TOKEN: &str = "token"; static ACK_ID: AtomicU64 = AtomicU64::new(0); + fn test_context() -> Arc { + Arc::new(HecRejectionContext { + telemetry: Telemetry { + rejected: metrics::counter!("hec_rejected_test"), + }, + }) + } + fn get_hec_service( endpoint: String, acknowledgements_config: HecClientAcknowledgementsConfig, @@ -357,6 +419,9 @@ mod tests { Some(client), http_request_builder, acknowledgements_config, + RejectionReport::default(), + Compression::default(), + test_context(), ) } @@ -679,6 +744,9 @@ mod tests { Some(client), http_request_builder, acknowledgements_config, + RejectionReport::default(), + Compression::default(), + test_context(), ); let request = get_hec_request(); @@ -726,6 +794,9 @@ mod tests { indexer_acknowledgements_enabled: false, ..Default::default() }, + RejectionReport::default(), + Compression::default(), + test_context(), ); let request = get_hec_request(); @@ -785,6 +856,9 @@ mod tests { indexer_acknowledgements_enabled: false, ..Default::default() }, + RejectionReport::default(), + Compression::default(), + test_context(), ); let request = get_hec_request(); diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index e5d96d380..bd486b0be 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -12,10 +12,10 @@ use crate::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, build_healthcheck, build_http_batch_service, create_client, - service::{HecService, HttpRequestBuilder}, + service::{HecRejectionContext, HecService, HttpRequestBuilder, Telemetry}, EndpointTarget, SplunkHecDefaultBatchSettings, }, - util::http::HttpRetryLogic, + util::{http::HttpRetryLogic, RejectionReport}, }, }; use crate::sinks::util::http::{validate_headers, RequestConfig}; @@ -189,6 +189,10 @@ pub struct HecLogsSinkConfig { #[configurable(derived)] #[serde(default = "default_timestamp_configuration")] pub timestamp_configuration: Option, + + /// Controls how much detail is logged when Splunk HEC rejects a batch. + #[serde(default)] + pub rejection_report: RejectionReport, } @@ -260,6 +264,7 @@ impl GenerateConfig for HecLogsSinkConfig { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: RejectionReport::default(), }) .unwrap() } @@ -336,11 +341,23 @@ impl HecLogsSinkConfig { self.path.clone() )); + let context = Arc::new(HecRejectionContext { + telemetry: Telemetry { + rejected: metrics::counter!( + "hec_rejected", + "endpoint" => self.endpoint.clone(), + ), + }, + }); + let service = HecService::new( http_service, ack_client, http_request_builder, self.acknowledgements.clone(), + self.rejection_report.clone(), + self.compression, + context, ); let batch_settings = self.batch.into_batcher_settings()?; @@ -506,6 +523,7 @@ mod tests { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: RejectionReport::default(), }; let endpoint = format!("{endpoint}/services/collector/raw"); diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 0b4445c1f..9688cc912 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -250,7 +250,8 @@ async fn splunk_passthrough_token() { path: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, - timestamp_configuration: None + timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -763,6 +764,7 @@ async fn raw_endpoint_with_metadata_and_batch_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -843,6 +845,7 @@ async fn raw_endpoint_with_only_batch_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -904,6 +907,7 @@ async fn raw_endpoint_without_metadata_or_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -957,6 +961,7 @@ async fn event_endpoint_with_two_batch_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1031,6 +1036,7 @@ async fn event_endpoint_with_one_batch_header() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1090,6 +1096,7 @@ async fn event_endpoint_no_batching_on_metadata_fields() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1149,6 +1156,7 @@ async fn batch_headers_missing_value_separate_batch() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1213,6 +1221,7 @@ async fn batch_headers_static_headers_override() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); diff --git a/src/sinks/splunk_hec/metrics/config.rs b/src/sinks/splunk_hec/metrics/config.rs index 11510864e..aa392fc23 100644 --- a/src/sinks/splunk_hec/metrics/config.rs +++ b/src/sinks/splunk_hec/metrics/config.rs @@ -15,11 +15,11 @@ use crate::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, build_healthcheck, build_http_batch_service, config_host_key, create_client, - service::{HecService, HttpRequestBuilder}, + service::{HecRejectionContext, HecService, HttpRequestBuilder, Telemetry}, EndpointTarget, SplunkHecDefaultBatchSettings, }, util::{ - http::HttpRetryLogic, BatchConfig, Compression, ServiceBuilderExt, + http::HttpRetryLogic, BatchConfig, Compression, RejectionReport, ServiceBuilderExt, }, Healthcheck, }, @@ -124,6 +124,10 @@ pub struct HecMetricsSinkConfig { #[configurable(derived)] #[serde(default)] pub path: Option, + + /// Controls how much detail is logged when Splunk HEC rejects a batch. + #[serde(default)] + pub rejection_report: RejectionReport, } impl GenerateConfig for HecMetricsSinkConfig { @@ -142,6 +146,7 @@ impl GenerateConfig for HecMetricsSinkConfig { tls: None, acknowledgements: Default::default(), path: None, + rejection_report: RejectionReport::default(), }) .unwrap() } @@ -203,11 +208,23 @@ impl HecMetricsSinkConfig { self.path.clone() )); + let context = Arc::new(HecRejectionContext { + telemetry: Telemetry { + rejected: metrics::counter!( + "hec_rejected", + "endpoint" => self.endpoint.clone(), + ), + }, + }); + let service = HecService::new( http_service, ack_client, http_request_builder, self.acknowledgements.clone(), + self.rejection_report.clone(), + self.compression, + context, ); let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/splunk_hec/metrics/tests.rs b/src/sinks/splunk_hec/metrics/tests.rs index b11dc25fa..3bc725527 100644 --- a/src/sinks/splunk_hec/metrics/tests.rs +++ b/src/sinks/splunk_hec/metrics/tests.rs @@ -331,7 +331,8 @@ async fn splunk_passthrough_token() { tls: None, acknowledgements: Default::default(), default_namespace: None, - path: None + path: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 4af68e03b..63bce66a1 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -15,6 +15,7 @@ pub mod normalizer; pub mod partitioner; pub mod processed_event; pub mod request_builder; +pub mod rejection_report; pub mod retries; pub mod service; pub mod sink; @@ -49,6 +50,7 @@ pub use jwt_auth::AuthTokenConfig; pub use jwt_auth::{AuthState, AuthToken}; pub use compressor::Compressor; pub use compressor::Decompressor; +pub use rejection_report::{emit_rejection_error, RejectionContext, RejectionReport}; pub use normalizer::Normalizer; pub use request_builder::{IncrementalRequestBuilder, RequestBuilder}; pub use service::{ diff --git a/src/sinks/util/rejection_report.rs b/src/sinks/util/rejection_report.rs new file mode 100644 index 000000000..ab46e511e --- /dev/null +++ b/src/sinks/util/rejection_report.rs @@ -0,0 +1,111 @@ +use bytes::Bytes; +use vector_lib::configurable::configurable_component; + +use super::{Compression, Decompressor}; + +/// Controls how much detail is logged when a sink's HTTP request is rejected. +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(deny_unknown_fields, rename_all = "snake_case")] +pub enum RejectionReport { + /// Increment counters only; do not log request or response bodies. + #[serde(alias = "normal")] + Stats, + + /// Log the HTTP response body on rejection. + Response, + + /// Log both the request payload and the HTTP response body (may be large; + /// use smaller batch sizes when debugging with this mode). + RequestResponse, +} + +impl Default for RejectionReport { + fn default() -> Self { + Self::Stats + } +} + +impl RejectionReport { + /// `true` only for `RequestResponse` — the caller must clone request bytes before the send. + pub fn needs_request(&self) -> bool { + matches!(self, Self::RequestResponse) + } +} + +/// Sink-specific behaviour plugged into `emit_rejection_error`. +/// +/// Each sink implements this to provide its own counters, response parsing, +/// and log category. The generic `emit_rejection_error` handles the three +/// `RejectionReport` mode branches. +pub trait RejectionContext: Send + Sync { + /// Short category label emitted as a structured log field + /// (e.g. `"es_rej_rpt"`, `"hec_rej_rpt"`). + fn log_category(&self) -> &'static str; + + /// Human-readable error code string (default: `"http_response_"`). + fn error_code(&self, status: u16) -> String { + format!("http_response_{status}") + } + + /// Human-readable message describing the rejection. + fn error_message(&self, status: u16, body: &Bytes) -> String; + + /// Update sink-specific counters. Called once per rejection before logging. + fn record_rejection(&self, status: u16, body: &Bytes); +} + +/// Emit a structured error log for a rejected or errored HTTP response. +/// +/// Handles all three `RejectionReport` modes. `request` must be +/// `Some((compressed_body, compression))` when `mode` is `RequestResponse`; +/// pass `None` otherwise (or when the request body is unavailable, e.g. 5xx). +pub fn emit_rejection_error( + context: &C, + status: u16, + response_body: &Bytes, + request: Option<(Bytes, Compression)>, + mode: RejectionReport, +) { + context.record_rejection(status, response_body); + let error_code = context.error_code(status); + let message = context.error_message(status, response_body); + let category = context.log_category(); + let response_body_str = String::from_utf8_lossy(response_body); + + match (mode, request) { + (RejectionReport::RequestResponse, Some((body, comp))) => { + let decomp = Decompressor::from(comp); + let req_data = match decomp.decompress(body) { + Ok(data) => data, + Err(err) => format!("- decompression failed({comp}): '{err}' -").into(), + }; + error!( + category = category, + message = message, + error_code = error_code, + response_status = status, + response_body = %response_body_str, + request = %String::from_utf8_lossy(&req_data), + ); + } + (RejectionReport::Stats, _) => { + error!( + category = category, + message = message, + error_code = error_code, + ); + } + _ => { + // Covers `Response` mode and `RequestResponse` without a body + // (e.g. 5xx where the request payload is suppressed). + error!( + category = category, + message = message, + error_code = error_code, + response_status = status, + response_body = %response_body_str, + ); + } + } +} From cd0adca6f3c519c3ef2b08e0e2be66f85e7d184f Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 13:13:17 +0530 Subject: [PATCH 2/8] OBE-10327: add tests for rejection report infrastructure and HEC service rejection paths Co-Authored-By: Akshaya's Agent --- src/sinks/splunk_hec/common/service.rs | 113 ++++++++++++++++++++++++ src/sinks/util/rejection_report.rs | 116 +++++++++++++++++++++++++ 2 files changed, 229 insertions(+) diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index dd8dc5ba2..e24651ffa 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -383,6 +383,38 @@ mod tests { }, }; + fn get_hec_service_with_rejection_report( + endpoint: String, + rej_rpt: RejectionReport, + acknowledgements_config: HecClientAcknowledgementsConfig, + ) -> HecService, crate::Error>> { + let app_info = crate::app_info(); + let client = HttpClient::new(None, &ProxyConfig::default(), &app_info).unwrap(); + let http_request_builder = Arc::new(HttpRequestBuilder::new( + endpoint, + EndpointTarget::default(), + String::from(TOKEN), + Compression::default(), + IndexMap::default(), + )); + let http_service = build_http_batch_service( + client.clone(), + Arc::clone(&http_request_builder), + EndpointTarget::Event, + false, + None, + ); + HecService::new( + BoxService::new(http_service), + None, + http_request_builder, + acknowledgements_config, + rej_rpt, + Compression::default(), + test_context(), + ) + } + const TOKEN: &str = "token"; static ACK_ID: AtomicU64 = AtomicU64::new(0); @@ -865,6 +897,87 @@ mod tests { let response = service.ready().await.unwrap().call(request).await.unwrap(); assert_eq!(EventStatus::Delivered, response.event_status); } + + // --- Rejection / error status tests --- + + fn no_ack_config() -> HecClientAcknowledgementsConfig { + HecClientAcknowledgementsConfig { + indexer_acknowledgements_enabled: false, + ..Default::default() + } + } + + #[tokio::test] + async fn hec_service_4xx_returns_rejected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(400).set_body_string(r#"{"text":"Invalid token","code":4}"#)) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::Stats, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Rejected, response.event_status); + } + + #[tokio::test] + async fn hec_service_5xx_returns_errored() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(503).set_body_string("Service Unavailable")) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::Stats, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Errored, response.event_status); + } + + // A 5xx with RequestResponse mode should still return Errored. + // Internally the mode is downgraded to Response (no request body in 5xx log) + // but the event status is unaffected. + #[tokio::test] + async fn hec_service_5xx_with_request_response_mode_still_errored() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error")) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::RequestResponse, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Errored, response.event_status); + } + + // A 4xx with RequestResponse mode returns Rejected (not affected by the mode). + #[tokio::test] + async fn hec_service_4xx_with_request_response_mode_returns_rejected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(403).set_body_string(r#"{"text":"Forbidden","code":6}"#)) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::RequestResponse, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Rejected, response.event_status); + } } diff --git a/src/sinks/util/rejection_report.rs b/src/sinks/util/rejection_report.rs index ab46e511e..c60aa6708 100644 --- a/src/sinks/util/rejection_report.rs +++ b/src/sinks/util/rejection_report.rs @@ -109,3 +109,119 @@ pub fn emit_rejection_error( } } } + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }; + + use super::*; + + // --- RejectionReport enum behaviour --- + + #[test] + fn default_is_stats() { + assert_eq!(RejectionReport::default(), RejectionReport::Stats); + } + + #[test] + fn needs_request_is_true_only_for_request_response() { + assert!(!RejectionReport::Stats.needs_request()); + assert!(!RejectionReport::Response.needs_request()); + assert!(RejectionReport::RequestResponse.needs_request()); + } + + #[test] + fn serde_roundtrip() { + let cases: &[(&str, RejectionReport)] = &[ + (r#""stats""#, RejectionReport::Stats), + (r#""normal""#, RejectionReport::Stats), // alias + (r#""response""#, RejectionReport::Response), + (r#""request_response""#, RejectionReport::RequestResponse), + ]; + for (input, expected) in cases { + let parsed: RejectionReport = serde_json::from_str(input) + .unwrap_or_else(|_| panic!("failed to parse {input}")); + assert_eq!(&parsed, expected, "input={input}"); + } + // Serialize → Deserialize round-trip + for variant in [RejectionReport::Stats, RejectionReport::Response, RejectionReport::RequestResponse] { + let json = serde_json::to_string(&variant).unwrap(); + let back: RejectionReport = serde_json::from_str(&json).unwrap(); + assert_eq!(back, variant); + } + } + + // --- emit_rejection_error via mock context --- + + struct MockContext { + call_count: Arc, + } + + impl RejectionContext for MockContext { + fn log_category(&self) -> &'static str { + "test_cat" + } + + fn error_message(&self, status: u16, _body: &Bytes) -> String { + format!("test error {status}") + } + + fn record_rejection(&self, _status: u16, _body: &Bytes) { + self.call_count.fetch_add(1, Ordering::Relaxed); + } + } + + fn make_ctx() -> (MockContext, Arc) { + let count = Arc::new(AtomicU64::new(0)); + let ctx = MockContext { call_count: Arc::clone(&count) }; + (ctx, count) + } + + #[test] + fn record_rejection_called_once_per_invocation_for_every_mode() { + let body = Bytes::from("error body"); + + let (ctx, count) = make_ctx(); + emit_rejection_error(&ctx, 400, &body, None, RejectionReport::Stats); + assert_eq!(count.load(Ordering::Relaxed), 1); + + let (ctx, count) = make_ctx(); + emit_rejection_error(&ctx, 400, &body, None, RejectionReport::Response); + assert_eq!(count.load(Ordering::Relaxed), 1); + + let (ctx, count) = make_ctx(); + // RequestResponse without a request body falls back to response-only logging. + emit_rejection_error(&ctx, 400, &body, None, RejectionReport::RequestResponse); + assert_eq!(count.load(Ordering::Relaxed), 1); + + let (ctx, count) = make_ctx(); + let req = Bytes::from("request body"); + emit_rejection_error(&ctx, 400, &body, Some((req, Compression::None)), RejectionReport::RequestResponse); + assert_eq!(count.load(Ordering::Relaxed), 1); + } + + #[test] + fn request_response_mode_decompresses_uncompressed_body_without_panic() { + let (ctx, _) = make_ctx(); + let response_body = Bytes::from("response body"); + let request_body = Bytes::from("request payload"); + // Should complete without panicking; decompressor pass-through for Compression::None. + emit_rejection_error( + &ctx, + 400, + &response_body, + Some((request_body, Compression::None)), + RejectionReport::RequestResponse, + ); + } + + #[test] + fn error_code_default_impl_formats_status() { + let (ctx, _) = make_ctx(); + assert_eq!(ctx.error_code(400), "http_response_400"); + assert_eq!(ctx.error_code(503), "http_response_503"); + } +} From 983e3ae6df85e27baa530d3803c0a0c9165388a7 Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 17:44:24 +0530 Subject: [PATCH 3/8] OBE-10327: remove unnecessary comments from rejection report tests Co-Authored-By: Akshaya's Agent --- src/sinks/splunk_hec/common/service.rs | 6 ------ src/sinks/util/rejection_report.rs | 6 ------ 2 files changed, 12 deletions(-) diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index e24651ffa..5822fa5c5 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -898,8 +898,6 @@ mod tests { assert_eq!(EventStatus::Delivered, response.event_status); } - // --- Rejection / error status tests --- - fn no_ack_config() -> HecClientAcknowledgementsConfig { HecClientAcknowledgementsConfig { indexer_acknowledgements_enabled: false, @@ -941,9 +939,6 @@ mod tests { assert_eq!(EventStatus::Errored, response.event_status); } - // A 5xx with RequestResponse mode should still return Errored. - // Internally the mode is downgraded to Response (no request body in 5xx log) - // but the event status is unaffected. #[tokio::test] async fn hec_service_5xx_with_request_response_mode_still_errored() { let mock_server = MockServer::start().await; @@ -961,7 +956,6 @@ mod tests { assert_eq!(EventStatus::Errored, response.event_status); } - // A 4xx with RequestResponse mode returns Rejected (not affected by the mode). #[tokio::test] async fn hec_service_4xx_with_request_response_mode_returns_rejected() { let mock_server = MockServer::start().await; diff --git a/src/sinks/util/rejection_report.rs b/src/sinks/util/rejection_report.rs index c60aa6708..31cad11e6 100644 --- a/src/sinks/util/rejection_report.rs +++ b/src/sinks/util/rejection_report.rs @@ -119,8 +119,6 @@ mod tests { use super::*; - // --- RejectionReport enum behaviour --- - #[test] fn default_is_stats() { assert_eq!(RejectionReport::default(), RejectionReport::Stats); @@ -146,7 +144,6 @@ mod tests { .unwrap_or_else(|_| panic!("failed to parse {input}")); assert_eq!(&parsed, expected, "input={input}"); } - // Serialize → Deserialize round-trip for variant in [RejectionReport::Stats, RejectionReport::Response, RejectionReport::RequestResponse] { let json = serde_json::to_string(&variant).unwrap(); let back: RejectionReport = serde_json::from_str(&json).unwrap(); @@ -154,8 +151,6 @@ mod tests { } } - // --- emit_rejection_error via mock context --- - struct MockContext { call_count: Arc, } @@ -208,7 +203,6 @@ mod tests { let (ctx, _) = make_ctx(); let response_body = Bytes::from("response body"); let request_body = Bytes::from("request payload"); - // Should complete without panicking; decompressor pass-through for Compression::None. emit_rejection_error( &ctx, 400, From 54583921fee932a7b9480d388730689c20f259b8 Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 19:21:36 +0530 Subject: [PATCH 4/8] OBE-10327: remove log_category from RejectionContext trait MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sinks no longer emit a category field in rejection logs — vector's own structured logging already identifies the component. The trait method is removed so implementations are not forced to provide one. Co-Authored-By: Akshaya's Agent --- src/sinks/elasticsearch/service.rs | 4 ---- src/sinks/splunk_hec/common/service.rs | 4 ---- src/sinks/util/rejection_report.rs | 17 ++--------------- 3 files changed, 2 insertions(+), 23 deletions(-) diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index 7155bff0b..c0283bae1 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -82,10 +82,6 @@ pub struct ElasticsearchRejectionContext { } impl RejectionContext for ElasticsearchRejectionContext { - fn log_category(&self) -> &'static str { - "es_rej_rpt" - } - fn error_code(&self, status: u16) -> String { format!("http_response_{status}") } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 5822fa5c5..f7ed0b749 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -42,10 +42,6 @@ pub struct HecRejectionContext { } impl RejectionContext for HecRejectionContext { - fn log_category(&self) -> &'static str { - "hec_rej_rpt" - } - fn error_message(&self, status: u16, _body: &Bytes) -> String { format!("Request rejected (status: {status}).") } diff --git a/src/sinks/util/rejection_report.rs b/src/sinks/util/rejection_report.rs index 31cad11e6..42fef17f9 100644 --- a/src/sinks/util/rejection_report.rs +++ b/src/sinks/util/rejection_report.rs @@ -35,14 +35,9 @@ impl RejectionReport { /// Sink-specific behaviour plugged into `emit_rejection_error`. /// -/// Each sink implements this to provide its own counters, response parsing, -/// and log category. The generic `emit_rejection_error` handles the three -/// `RejectionReport` mode branches. +/// Each sink implements this to provide its own counters and response parsing. +/// The generic `emit_rejection_error` handles the three `RejectionReport` mode branches. pub trait RejectionContext: Send + Sync { - /// Short category label emitted as a structured log field - /// (e.g. `"es_rej_rpt"`, `"hec_rej_rpt"`). - fn log_category(&self) -> &'static str; - /// Human-readable error code string (default: `"http_response_"`). fn error_code(&self, status: u16) -> String { format!("http_response_{status}") @@ -70,7 +65,6 @@ pub fn emit_rejection_error( context.record_rejection(status, response_body); let error_code = context.error_code(status); let message = context.error_message(status, response_body); - let category = context.log_category(); let response_body_str = String::from_utf8_lossy(response_body); match (mode, request) { @@ -81,7 +75,6 @@ pub fn emit_rejection_error( Err(err) => format!("- decompression failed({comp}): '{err}' -").into(), }; error!( - category = category, message = message, error_code = error_code, response_status = status, @@ -91,7 +84,6 @@ pub fn emit_rejection_error( } (RejectionReport::Stats, _) => { error!( - category = category, message = message, error_code = error_code, ); @@ -100,7 +92,6 @@ pub fn emit_rejection_error( // Covers `Response` mode and `RequestResponse` without a body // (e.g. 5xx where the request payload is suppressed). error!( - category = category, message = message, error_code = error_code, response_status = status, @@ -156,10 +147,6 @@ mod tests { } impl RejectionContext for MockContext { - fn log_category(&self) -> &'static str { - "test_cat" - } - fn error_message(&self, status: u16, _body: &Bytes) -> String { format!("test error {status}") } From e4448944c3d57a5b95a4f89870ec9fba03d4c2d9 Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 19:57:17 +0530 Subject: [PATCH 5/8] OBE-10327: address review feedback on HecRejectionContext - Inline rejected counter directly into HecRejectionContext, removing the Telemetry wrapper struct that collided with the ES Telemetry name - Parse Splunk JSON error body in error_message to surface the text field - Remove #OBSERVO_STYLE_TELEMETRY# cross-reference comment Co-Authored-By: Akshaya's Agent --- src/sinks/splunk_hec/common/service.rs | 26 ++++++++++++-------------- src/sinks/splunk_hec/logs/config.rs | 12 +++++------- src/sinks/splunk_hec/metrics/config.rs | 12 +++++------- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index f7ed0b749..4c9df0c4b 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -31,23 +31,23 @@ use crate::{ }, }; -// #OBSERVO_STYLE_TELEMETRY# — see ElasticsearchService for rationale. -#[derive(Clone)] -pub struct Telemetry { - pub rejected: Counter, -} - pub struct HecRejectionContext { - pub telemetry: Telemetry, + pub rejected: Counter, } impl RejectionContext for HecRejectionContext { - fn error_message(&self, status: u16, _body: &Bytes) -> String { - format!("Request rejected (status: {status}).") + fn error_message(&self, status: u16, body: &Bytes) -> String { + let splunk_text = serde_json::from_slice::(body) + .ok() + .and_then(|v| v.get("text").and_then(|t| t.as_str()).map(str::to_owned)); + match splunk_text { + Some(text) => format!("Request rejected (status: {status}): {text}."), + None => format!("Request rejected (status: {status})."), + } } fn record_rejection(&self, _status: u16, _body: &Bytes) { - self.telemetry.rejected.increment(1); + self.rejected.increment(1); } } @@ -372,7 +372,7 @@ mod tests { }, build_http_batch_service, request::HecRequest, - service::{HecAckResponseBody, HecRejectionContext, HecService, HttpRequestBuilder, Telemetry}, + service::{HecAckResponseBody, HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, }, util::{metadata::RequestMetadataBuilder, Compression, RejectionReport}, @@ -416,9 +416,7 @@ mod tests { fn test_context() -> Arc { Arc::new(HecRejectionContext { - telemetry: Telemetry { - rejected: metrics::counter!("hec_rejected_test"), - }, + rejected: metrics::counter!("hec_rejected_test"), }) } diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index bd486b0be..948b0fd57 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -12,7 +12,7 @@ use crate::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, build_healthcheck, build_http_batch_service, create_client, - service::{HecRejectionContext, HecService, HttpRequestBuilder, Telemetry}, + service::{HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, SplunkHecDefaultBatchSettings, }, util::{http::HttpRetryLogic, RejectionReport}, @@ -342,12 +342,10 @@ impl HecLogsSinkConfig { )); let context = Arc::new(HecRejectionContext { - telemetry: Telemetry { - rejected: metrics::counter!( - "hec_rejected", - "endpoint" => self.endpoint.clone(), - ), - }, + rejected: metrics::counter!( + "hec_rejected", + "endpoint" => self.endpoint.clone(), + ), }); let service = HecService::new( diff --git a/src/sinks/splunk_hec/metrics/config.rs b/src/sinks/splunk_hec/metrics/config.rs index aa392fc23..0f2a6bc9d 100644 --- a/src/sinks/splunk_hec/metrics/config.rs +++ b/src/sinks/splunk_hec/metrics/config.rs @@ -15,7 +15,7 @@ use crate::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, build_healthcheck, build_http_batch_service, config_host_key, create_client, - service::{HecRejectionContext, HecService, HttpRequestBuilder, Telemetry}, + service::{HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, SplunkHecDefaultBatchSettings, }, util::{ @@ -209,12 +209,10 @@ impl HecMetricsSinkConfig { )); let context = Arc::new(HecRejectionContext { - telemetry: Telemetry { - rejected: metrics::counter!( - "hec_rejected", - "endpoint" => self.endpoint.clone(), - ), - }, + rejected: metrics::counter!( + "hec_rejected", + "endpoint" => self.endpoint.clone(), + ), }); let service = HecService::new( From 5e122bb53ec30b8df9d827a4ef96e44ef3b29d68 Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 21:41:32 +0530 Subject: [PATCH 6/8] OBE-10327: add missing rejection_report field to HecLogsSinkConfig initialisers Co-Authored-By: Akshaya's Agent --- src/sinks/humio/logs.rs | 1 + src/sources/splunk_hec/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 065f6e045..223e2f0a1 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -220,6 +220,7 @@ impl HumioLogsConfig { preserve_timestamp_key: false, }), batch_headers: BatchHeaders::default(), + rejection_report: Default::default(), } } } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index d313c34ce..17301e43c 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1380,6 +1380,7 @@ mod tests { auto_extract_timestamp: None, endpoint_target: Default::default(), timestamp_configuration: None, + rejection_report: Default::default(), } .build(SinkContext::default()) .await From b2cf757cfd2d88811e5d276724015d391c335202 Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Mon, 15 Jun 2026 21:44:45 +0530 Subject: [PATCH 7/8] OBE-10327: add missing fields to HEC integration test config helpers Co-Authored-By: Akshaya's Agent --- src/sinks/splunk_hec/logs/integration_tests.rs | 4 +++- src/sinks/splunk_hec/metrics/integration_tests.rs | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index eb8753ff8..d909a2a59 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -134,7 +134,9 @@ async fn config( auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, - path: None + path: None, + batch_headers: Default::default(), + rejection_report: Default::default(), } } diff --git a/src/sinks/splunk_hec/metrics/integration_tests.rs b/src/sinks/splunk_hec/metrics/integration_tests.rs index 9111cd0a3..d0afcce47 100644 --- a/src/sinks/splunk_hec/metrics/integration_tests.rs +++ b/src/sinks/splunk_hec/metrics/integration_tests.rs @@ -46,6 +46,8 @@ async fn config() -> HecMetricsSinkConfig { request: TowerRequestConfig::default(), tls: None, acknowledgements: Default::default(), + path: None, + rejection_report: Default::default(), } } From 524d3d10dadd601e227b23f8fcccc2a31dfd545e Mon Sep 17 00:00:00 2001 From: akshayakumar-t Date: Tue, 16 Jun 2026 15:00:02 +0530 Subject: [PATCH 8/8] OBE-10327: address v2 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use typed SplunkErrorBody struct instead of serde_json::Value to parse the Splunk error body — avoids full allocation for one field - Replace &*context with context.as_ref() for consistency - Add unit tests for HecRejectionContext::error_message covering the text-field-present and fallback paths Co-Authored-By: Akshaya's Agent --- src/sinks/splunk_hec/common/service.rs | 33 ++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 4c9df0c4b..8b76ab411 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -35,11 +35,16 @@ pub struct HecRejectionContext { pub rejected: Counter, } +#[derive(serde::Deserialize)] +struct SplunkErrorBody { + text: Option, +} + impl RejectionContext for HecRejectionContext { fn error_message(&self, status: u16, body: &Bytes) -> String { - let splunk_text = serde_json::from_slice::(body) + let splunk_text = serde_json::from_slice::(body) .ok() - .and_then(|v| v.get("text").and_then(|t| t.as_str()).map(str::to_owned)); + .and_then(|b| b.text); match splunk_text { Some(text) => format!("Request rejected (status: {status}): {text}."), None => format!("Request rejected (status: {status})."), @@ -197,10 +202,10 @@ where } else { rej_rpt }; - emit_rejection_error(&*context, response.status_code(), response.body(), None, mode); + emit_rejection_error(context.as_ref(), response.status_code(), response.body(), None, mode); EventStatus::Errored } else { - emit_rejection_error(&*context, response.status_code(), response.body(), req_for_rpt, rej_rpt); + emit_rejection_error(context.as_ref(), response.status_code(), response.body(), req_for_rpt, rej_rpt); EventStatus::Rejected }; @@ -375,7 +380,7 @@ mod tests { service::{HecAckResponseBody, HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, }, - util::{metadata::RequestMetadataBuilder, Compression, RejectionReport}, + util::{metadata::RequestMetadataBuilder, Compression, RejectionContext, RejectionReport}, }, }; @@ -420,6 +425,24 @@ mod tests { }) } + #[test] + fn error_message_extracts_splunk_text_field() { + let ctx = HecRejectionContext { rejected: metrics::counter!("_test") }; + assert_eq!( + ctx.error_message(400, &Bytes::from(r#"{"text":"Invalid token","code":4}"#)), + "Request rejected (status: 400): Invalid token." + ); + } + + #[test] + fn error_message_falls_back_to_status_when_no_text_field() { + let ctx = HecRejectionContext { rejected: metrics::counter!("_test") }; + assert_eq!( + ctx.error_message(503, &Bytes::from("Service Unavailable")), + "Request rejected (status: 503)." + ); + } + fn get_hec_service( endpoint: String, acknowledgements_config: HecClientAcknowledgementsConfig,