diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 9dc1a9282..b9f48ef00 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -16,7 +16,7 @@ use crate::{ elasticsearch::{ health::ElasticsearchHealthLogic, retry::ElasticsearchRetryLogic, - service::{ElasticsearchService, HttpRequestBuilder, Telemetry}, + service::{ElasticsearchRejectionContext, ElasticsearchService, HttpRequestBuilder, Telemetry}, sink::ElasticsearchSink, ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon, ElasticsearchCommonMode, ElasticsearchMode, RejectionReport, VersionType, @@ -573,6 +573,7 @@ impl SinkConfig for ElasticsearchConfig { "end_pt" => endpt_str, ), }; + let context = std::sync::Arc::new(ElasticsearchRejectionContext { telemetry }); let http_request_builder = HttpRequestBuilder::new(&common, self); let service = ElasticsearchService::new( @@ -580,7 +581,7 @@ impl SinkConfig for ElasticsearchConfig { http_request_builder, self.rejection_report.clone(), self.compression.clone(), - telemetry); + context); (endpoint, service) }) diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index 0ae67d00c..dfa2977c9 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -77,37 +77,7 @@ impl Default for ElasticsearchMode { } } -/// Elasticsearch rejection reporting mode. -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -#[serde(deny_unknown_fields, rename_all = "snake_case")] -pub enum RejectionReport { - /// Do not print rejection response, only increment stats - #[serde(alias = "normal")] - Stats, - - /// Report entire response, the response may be large and likely not too useful - Response, - - /// Report entire request and response may be huge, with smaller batch-sizes this - /// can be useful to debug issues - RequestResponse, -} - -impl Default for RejectionReport { - fn default() -> Self { - Self::Stats - } -} - -impl RejectionReport { - pub fn needs_request(&self) -> bool { - match self { - Self::RequestResponse => true, - _ => false, - } - } -} +pub use crate::sinks::util::RejectionReport; /// Bulk API actions. #[configurable_component] diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index 377805b57..c0283bae1 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -24,7 +24,7 @@ use crate::{ sinks::util::{ auth::Auth, http::{HttpBatchService, RequestConfig}, - Compression, ElementCount, Decompressor, + Compression, ElementCount, emit_rejection_error, RejectionContext, }, }; @@ -77,6 +77,26 @@ pub struct Telemetry { pub indexed: Counter, } +pub struct ElasticsearchRejectionContext { + pub telemetry: Telemetry, +} + +impl RejectionContext for ElasticsearchRejectionContext { + fn error_code(&self, status: u16) -> String { + format!("http_response_{status}") + } + + fn error_message(&self, status: u16, body: &Bytes) -> String { + err_summary(status, body).msg + } + + fn record_rejection(&self, status: u16, body: &Bytes) { + let s = err_summary(status, body); + self.telemetry.rejected.increment(s.rejected); + self.telemetry.indexed.increment(s.indexed); + } +} + #[derive(Clone)] pub struct ElasticsearchService { // TODO: `HttpBatchService` has been deprecated for direct use in sinks. @@ -88,7 +108,7 @@ pub struct ElasticsearchService { >, rej_rpt: RejectionReport, compression: Compression, - telemetry: Telemetry, + context: Arc, } impl ElasticsearchService { @@ -97,7 +117,7 @@ impl ElasticsearchService { http_request_builder: HttpRequestBuilder, rej_rpt: RejectionReport, compression: Compression, - telemetry: Telemetry, + context: Arc, ) -> ElasticsearchService { let http_request_builder = Arc::new(http_request_builder); let batch_service = HttpBatchService::new(http_client, move |req| { @@ -106,7 +126,7 @@ impl ElasticsearchService { Box::pin(async move { request_builder.build_request(req).await }); future }); - ElasticsearchService { batch_service, rej_rpt, compression, telemetry } + ElasticsearchService { batch_service, rej_rpt, compression, context } } } @@ -213,14 +233,14 @@ impl Service for ElasticsearchService { } else { None }; - let telemetry = self.telemetry.clone(); + let context = Arc::clone(&self.context); Box::pin(async move { http_service.ready().await?; let events_byte_size = std::mem::take(req.metadata_mut()).into_events_estimated_json_encoded_byte_size(); let http_response = http_service.call(req).await?; - let event_status = get_event_status(&http_response, req_for_rpt, rej_rpt, telemetry); + let event_status = get_event_status(&http_response, req_for_rpt, rej_rpt, &context); Ok(ElasticsearchResponse { event_status, http_response, @@ -230,8 +250,6 @@ impl Service for ElasticsearchService { } } -const ES_REJ_RPT: &str = "es_rej_rpt"; - fn response_frag(key: &str, val_prefix: &str) -> String { format!("\"{key}\":{val_prefix}") } @@ -244,97 +262,55 @@ struct ErrSummary { rejected: u64, } -fn err_summary(response: &Response) -> ErrSummary { - let body = String::from_utf8_lossy(response.body()); - let i = - body +fn err_summary(status: u16, body: &Bytes) -> ErrSummary { + let body_str = String::from_utf8_lossy(body); + let i: u64 = + body_str .match_indices(response_frag("status", "201").as_str()) .count() .try_into() .unwrap(); - let r = - body + let r: u64 = + body_str .match_indices(response_frag("status", "400").as_str()) .count() .try_into() .unwrap(); ErrSummary { - error_code: format!("http_response_{}", response.status().as_u16()), + error_code: format!("http_response_{status}"), msg: format!("Request contained errors (indexed: {i}, rejected: {r})."), indexed: i, rejected: r } } -fn emit_bad_response_error( - response: &Response, - request: Option<(ElasticsearchRequest, Compression)>, - rej_rpt: RejectionReport, - telemetry: Telemetry, -) { - let err_summary = err_summary(response); - telemetry.indexed.increment(err_summary.indexed); - telemetry.rejected.increment(err_summary.rejected); - - match (rej_rpt, request) { - (RejectionReport::RequestResponse, Some((req, comp))) => { - let decomp = Decompressor::from(comp); - let req_data = match decomp.decompress(req.payload) { - Ok(data) => data, - Err(err) => format!("- decompression failed({comp}): '{err}' -").into() - }; - - error!( - category = ES_REJ_RPT, - message = err_summary.msg, - error_code = err_summary.error_code, - response = ?response, - request = %String::from_utf8_lossy(&req_data), - ); - } - (RejectionReport::Stats, _) => { - error!( - category = ES_REJ_RPT, - message = err_summary.msg, - error_code = err_summary.error_code, - ); - } - _ => { - error!( - category = ES_REJ_RPT, - message = err_summary.msg, - error_code = err_summary.error_code, - response = ?response, - ); - } - }; -} - fn get_event_status( response: &Response, request: Option<(ElasticsearchRequest, Compression)>, rej_rpt: RejectionReport, - telemetry: Telemetry, + context: &ElasticsearchRejectionContext, ) -> EventStatus { let status = response.status(); if status.is_success() { - let body = String::from_utf8_lossy(response.body()); - if body.contains(response_frag("errors", "true").as_str()) { - emit_bad_response_error(response, request, rej_rpt, telemetry); + let body = response.body(); + if String::from_utf8_lossy(body).contains(response_frag("errors", "true").as_str()) { + let req = request.map(|(req, comp)| (req.payload, comp)); + emit_rejection_error(context, status.as_u16(), body, req, rej_rpt); EventStatus::Rejected } else { EventStatus::Delivered } } else if status.is_server_error() { - let rej_rpt = if rej_rpt == RejectionReport::RequestResponse { + let mode = if rej_rpt == RejectionReport::RequestResponse { RejectionReport::Response } else { rej_rpt }; - emit_bad_response_error(response, None, rej_rpt, telemetry); + emit_rejection_error(context, status.as_u16(), response.body(), None, mode); EventStatus::Errored } else { - emit_bad_response_error(response, request, rej_rpt, telemetry); + let req = request.map(|(req, comp)| (req.payload, comp)); + emit_rejection_error(context, status.as_u16(), response.body(), req, rej_rpt); EventStatus::Rejected } } @@ -366,7 +342,7 @@ mod tests { assert!(body.contains(response_frag("errors", "true").as_str())); assert_eq!( - err_summary(&res), + err_summary(res.status().as_u16(), res.body()), ErrSummary { error_code: "http_response_200".into(), msg: "Request contained errors (indexed: 259, rejected: 5).".to_string(), diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 065f6e045..223e2f0a1 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -220,6 +220,7 @@ impl HumioLogsConfig { preserve_timestamp_key: false, }), batch_headers: BatchHeaders::default(), + rejection_report: Default::default(), } } } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index dc22cb884..8b76ab411 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -7,6 +7,7 @@ use bytes::Bytes; use futures_util::future::BoxFuture; use http::{HeaderName, Request, HeaderValue}; use indexmap::IndexMap; +use metrics::Counter; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; @@ -25,16 +26,44 @@ use crate::{ internal_events::{SplunkIndexerAcknowledgementUnavailableError, SplunkResponseParseError}, sinks::{ splunk_hec::common::{build_uri, request::HecRequest, response::HecResponse}, - util::{sink::Response, Compression}, + util::{emit_rejection_error, sink::Response, Compression, RejectionContext, RejectionReport}, UriParseSnafu, }, }; +pub struct HecRejectionContext { + pub rejected: Counter, +} + +#[derive(serde::Deserialize)] +struct SplunkErrorBody { + text: Option, +} + +impl RejectionContext for HecRejectionContext { + fn error_message(&self, status: u16, body: &Bytes) -> String { + let splunk_text = serde_json::from_slice::(body) + .ok() + .and_then(|b| b.text); + match splunk_text { + Some(text) => format!("Request rejected (status: {status}): {text}."), + None => format!("Request rejected (status: {status})."), + } + } + + fn record_rejection(&self, _status: u16, _body: &Bytes) { + self.rejected.increment(1); + } +} + pub struct HecService { pub inner: S, ack_finalizer_tx: Option)>>, ack_slots: PollSemaphore, current_ack_slot: Option, + rej_rpt: RejectionReport, + compression: Compression, + context: Arc, } #[derive(Deserialize, Serialize, Debug)] @@ -55,6 +84,9 @@ where ack_client: Option, http_request_builder: Arc, indexer_acknowledgements: HecClientAcknowledgementsConfig, + rej_rpt: RejectionReport, + compression: Compression, + context: Arc, ) -> Self { let max_pending_acks = indexer_acknowledgements.max_pending_acks.get(); let tx = if let Some(ack_client) = ack_client { @@ -76,6 +108,9 @@ where ack_finalizer_tx: tx, ack_slots, current_ack_slot: None, + rej_rpt, + compression, + context, } } } @@ -112,6 +147,14 @@ where fn call(&mut self, mut req: HecRequest) -> Self::Future { let ack_finalizer_tx = self.ack_finalizer_tx.clone(); let ack_slot = self.current_ack_slot.take(); + let rej_rpt = self.rej_rpt.clone(); + let compression = self.compression; + let context = Arc::clone(&self.context); + let req_for_rpt = if rej_rpt.needs_request() { + Some((req.body.clone(), compression)) + } else { + None + }; let metadata = std::mem::take(req.metadata_mut()); let events_count = metadata.event_count(); @@ -154,8 +197,15 @@ where EventStatus::Delivered } } else if response.is_transient() { + let mode = if rej_rpt == RejectionReport::RequestResponse { + RejectionReport::Response + } else { + rej_rpt + }; + emit_rejection_error(context.as_ref(), response.status_code(), response.body(), None, mode); EventStatus::Errored } else { + emit_rejection_error(context.as_ref(), response.status_code(), response.body(), req_for_rpt, rej_rpt); EventStatus::Rejected }; @@ -170,12 +220,17 @@ where pub trait ResponseExt { fn body(&self) -> &Bytes; + fn status_code(&self) -> u16; } impl ResponseExt for http::Response { fn body(&self) -> &Bytes { self.body() } + + fn status_code(&self) -> u16 { + self.status().as_u16() + } } pub struct HttpRequestBuilder { @@ -322,16 +377,72 @@ mod tests { }, build_http_batch_service, request::HecRequest, - service::{HecAckResponseBody, HecService, HttpRequestBuilder}, + service::{HecAckResponseBody, HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, }, - util::{metadata::RequestMetadataBuilder, Compression}, + util::{metadata::RequestMetadataBuilder, Compression, RejectionContext, RejectionReport}, }, }; + fn get_hec_service_with_rejection_report( + endpoint: String, + rej_rpt: RejectionReport, + acknowledgements_config: HecClientAcknowledgementsConfig, + ) -> HecService, crate::Error>> { + let app_info = crate::app_info(); + let client = HttpClient::new(None, &ProxyConfig::default(), &app_info).unwrap(); + let http_request_builder = Arc::new(HttpRequestBuilder::new( + endpoint, + EndpointTarget::default(), + String::from(TOKEN), + Compression::default(), + IndexMap::default(), + )); + let http_service = build_http_batch_service( + client.clone(), + Arc::clone(&http_request_builder), + EndpointTarget::Event, + false, + None, + ); + HecService::new( + BoxService::new(http_service), + None, + http_request_builder, + acknowledgements_config, + rej_rpt, + Compression::default(), + test_context(), + ) + } + const TOKEN: &str = "token"; static ACK_ID: AtomicU64 = AtomicU64::new(0); + fn test_context() -> Arc { + Arc::new(HecRejectionContext { + rejected: metrics::counter!("hec_rejected_test"), + }) + } + + #[test] + fn error_message_extracts_splunk_text_field() { + let ctx = HecRejectionContext { rejected: metrics::counter!("_test") }; + assert_eq!( + ctx.error_message(400, &Bytes::from(r#"{"text":"Invalid token","code":4}"#)), + "Request rejected (status: 400): Invalid token." + ); + } + + #[test] + fn error_message_falls_back_to_status_when_no_text_field() { + let ctx = HecRejectionContext { rejected: metrics::counter!("_test") }; + assert_eq!( + ctx.error_message(503, &Bytes::from("Service Unavailable")), + "Request rejected (status: 503)." + ); + } + fn get_hec_service( endpoint: String, acknowledgements_config: HecClientAcknowledgementsConfig, @@ -357,6 +468,9 @@ mod tests { Some(client), http_request_builder, acknowledgements_config, + RejectionReport::default(), + Compression::default(), + test_context(), ) } @@ -679,6 +793,9 @@ mod tests { Some(client), http_request_builder, acknowledgements_config, + RejectionReport::default(), + Compression::default(), + test_context(), ); let request = get_hec_request(); @@ -726,6 +843,9 @@ mod tests { indexer_acknowledgements_enabled: false, ..Default::default() }, + RejectionReport::default(), + Compression::default(), + test_context(), ); let request = get_hec_request(); @@ -785,12 +905,90 @@ mod tests { indexer_acknowledgements_enabled: false, ..Default::default() }, + RejectionReport::default(), + Compression::default(), + test_context(), ); let request = get_hec_request(); let response = service.ready().await.unwrap().call(request).await.unwrap(); assert_eq!(EventStatus::Delivered, response.event_status); } + + fn no_ack_config() -> HecClientAcknowledgementsConfig { + HecClientAcknowledgementsConfig { + indexer_acknowledgements_enabled: false, + ..Default::default() + } + } + + #[tokio::test] + async fn hec_service_4xx_returns_rejected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(400).set_body_string(r#"{"text":"Invalid token","code":4}"#)) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::Stats, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Rejected, response.event_status); + } + + #[tokio::test] + async fn hec_service_5xx_returns_errored() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(503).set_body_string("Service Unavailable")) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::Stats, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Errored, response.event_status); + } + + #[tokio::test] + async fn hec_service_5xx_with_request_response_mode_still_errored() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error")) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::RequestResponse, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Errored, response.event_status); + } + + #[tokio::test] + async fn hec_service_4xx_with_request_response_mode_returns_rejected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(403).set_body_string(r#"{"text":"Forbidden","code":6}"#)) + .mount(&mock_server) + .await; + + let mut service = get_hec_service_with_rejection_report( + mock_server.uri(), + RejectionReport::RequestResponse, + no_ack_config(), + ); + let response = service.ready().await.unwrap().call(get_hec_request()).await.unwrap(); + assert_eq!(EventStatus::Rejected, response.event_status); + } } diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index e5d96d380..948b0fd57 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -12,10 +12,10 @@ use crate::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, build_healthcheck, build_http_batch_service, create_client, - service::{HecService, HttpRequestBuilder}, + service::{HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, SplunkHecDefaultBatchSettings, }, - util::http::HttpRetryLogic, + util::{http::HttpRetryLogic, RejectionReport}, }, }; use crate::sinks::util::http::{validate_headers, RequestConfig}; @@ -189,6 +189,10 @@ pub struct HecLogsSinkConfig { #[configurable(derived)] #[serde(default = "default_timestamp_configuration")] pub timestamp_configuration: Option, + + /// Controls how much detail is logged when Splunk HEC rejects a batch. + #[serde(default)] + pub rejection_report: RejectionReport, } @@ -260,6 +264,7 @@ impl GenerateConfig for HecLogsSinkConfig { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: RejectionReport::default(), }) .unwrap() } @@ -336,11 +341,21 @@ impl HecLogsSinkConfig { self.path.clone() )); + let context = Arc::new(HecRejectionContext { + rejected: metrics::counter!( + "hec_rejected", + "endpoint" => self.endpoint.clone(), + ), + }); + let service = HecService::new( http_service, ack_client, http_request_builder, self.acknowledgements.clone(), + self.rejection_report.clone(), + self.compression, + context, ); let batch_settings = self.batch.into_batcher_settings()?; @@ -506,6 +521,7 @@ mod tests { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: RejectionReport::default(), }; let endpoint = format!("{endpoint}/services/collector/raw"); diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index eb8753ff8..d909a2a59 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -134,7 +134,9 @@ async fn config( auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, - path: None + path: None, + batch_headers: Default::default(), + rejection_report: Default::default(), } } diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 0b4445c1f..9688cc912 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -250,7 +250,8 @@ async fn splunk_passthrough_token() { path: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, - timestamp_configuration: None + timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -763,6 +764,7 @@ async fn raw_endpoint_with_metadata_and_batch_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -843,6 +845,7 @@ async fn raw_endpoint_with_only_batch_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -904,6 +907,7 @@ async fn raw_endpoint_without_metadata_or_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -957,6 +961,7 @@ async fn event_endpoint_with_two_batch_headers() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1031,6 +1036,7 @@ async fn event_endpoint_with_one_batch_header() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1090,6 +1096,7 @@ async fn event_endpoint_no_batching_on_metadata_fields() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1149,6 +1156,7 @@ async fn batch_headers_missing_value_separate_batch() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); @@ -1213,6 +1221,7 @@ async fn batch_headers_static_headers_override() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, timestamp_configuration: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); diff --git a/src/sinks/splunk_hec/metrics/config.rs b/src/sinks/splunk_hec/metrics/config.rs index 11510864e..0f2a6bc9d 100644 --- a/src/sinks/splunk_hec/metrics/config.rs +++ b/src/sinks/splunk_hec/metrics/config.rs @@ -15,11 +15,11 @@ use crate::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, build_healthcheck, build_http_batch_service, config_host_key, create_client, - service::{HecService, HttpRequestBuilder}, + service::{HecRejectionContext, HecService, HttpRequestBuilder}, EndpointTarget, SplunkHecDefaultBatchSettings, }, util::{ - http::HttpRetryLogic, BatchConfig, Compression, ServiceBuilderExt, + http::HttpRetryLogic, BatchConfig, Compression, RejectionReport, ServiceBuilderExt, }, Healthcheck, }, @@ -124,6 +124,10 @@ pub struct HecMetricsSinkConfig { #[configurable(derived)] #[serde(default)] pub path: Option, + + /// Controls how much detail is logged when Splunk HEC rejects a batch. + #[serde(default)] + pub rejection_report: RejectionReport, } impl GenerateConfig for HecMetricsSinkConfig { @@ -142,6 +146,7 @@ impl GenerateConfig for HecMetricsSinkConfig { tls: None, acknowledgements: Default::default(), path: None, + rejection_report: RejectionReport::default(), }) .unwrap() } @@ -203,11 +208,21 @@ impl HecMetricsSinkConfig { self.path.clone() )); + let context = Arc::new(HecRejectionContext { + rejected: metrics::counter!( + "hec_rejected", + "endpoint" => self.endpoint.clone(), + ), + }); + let service = HecService::new( http_service, ack_client, http_request_builder, self.acknowledgements.clone(), + self.rejection_report.clone(), + self.compression, + context, ); let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/splunk_hec/metrics/integration_tests.rs b/src/sinks/splunk_hec/metrics/integration_tests.rs index 9111cd0a3..d0afcce47 100644 --- a/src/sinks/splunk_hec/metrics/integration_tests.rs +++ b/src/sinks/splunk_hec/metrics/integration_tests.rs @@ -46,6 +46,8 @@ async fn config() -> HecMetricsSinkConfig { request: TowerRequestConfig::default(), tls: None, acknowledgements: Default::default(), + path: None, + rejection_report: Default::default(), } } diff --git a/src/sinks/splunk_hec/metrics/tests.rs b/src/sinks/splunk_hec/metrics/tests.rs index b11dc25fa..3bc725527 100644 --- a/src/sinks/splunk_hec/metrics/tests.rs +++ b/src/sinks/splunk_hec/metrics/tests.rs @@ -331,7 +331,8 @@ async fn splunk_passthrough_token() { tls: None, acknowledgements: Default::default(), default_namespace: None, - path: None + path: None, + rejection_report: Default::default(), }; let cx = SinkContext::default(); diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 4af68e03b..63bce66a1 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -15,6 +15,7 @@ pub mod normalizer; pub mod partitioner; pub mod processed_event; pub mod request_builder; +pub mod rejection_report; pub mod retries; pub mod service; pub mod sink; @@ -49,6 +50,7 @@ pub use jwt_auth::AuthTokenConfig; pub use jwt_auth::{AuthState, AuthToken}; pub use compressor::Compressor; pub use compressor::Decompressor; +pub use rejection_report::{emit_rejection_error, RejectionContext, RejectionReport}; pub use normalizer::Normalizer; pub use request_builder::{IncrementalRequestBuilder, RequestBuilder}; pub use service::{ diff --git a/src/sinks/util/rejection_report.rs b/src/sinks/util/rejection_report.rs new file mode 100644 index 000000000..42fef17f9 --- /dev/null +++ b/src/sinks/util/rejection_report.rs @@ -0,0 +1,208 @@ +use bytes::Bytes; +use vector_lib::configurable::configurable_component; + +use super::{Compression, Decompressor}; + +/// Controls how much detail is logged when a sink's HTTP request is rejected. +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(deny_unknown_fields, rename_all = "snake_case")] +pub enum RejectionReport { + /// Increment counters only; do not log request or response bodies. + #[serde(alias = "normal")] + Stats, + + /// Log the HTTP response body on rejection. + Response, + + /// Log both the request payload and the HTTP response body (may be large; + /// use smaller batch sizes when debugging with this mode). + RequestResponse, +} + +impl Default for RejectionReport { + fn default() -> Self { + Self::Stats + } +} + +impl RejectionReport { + /// `true` only for `RequestResponse` — the caller must clone request bytes before the send. + pub fn needs_request(&self) -> bool { + matches!(self, Self::RequestResponse) + } +} + +/// Sink-specific behaviour plugged into `emit_rejection_error`. +/// +/// Each sink implements this to provide its own counters and response parsing. +/// The generic `emit_rejection_error` handles the three `RejectionReport` mode branches. +pub trait RejectionContext: Send + Sync { + /// Human-readable error code string (default: `"http_response_"`). + fn error_code(&self, status: u16) -> String { + format!("http_response_{status}") + } + + /// Human-readable message describing the rejection. + fn error_message(&self, status: u16, body: &Bytes) -> String; + + /// Update sink-specific counters. Called once per rejection before logging. + fn record_rejection(&self, status: u16, body: &Bytes); +} + +/// Emit a structured error log for a rejected or errored HTTP response. +/// +/// Handles all three `RejectionReport` modes. `request` must be +/// `Some((compressed_body, compression))` when `mode` is `RequestResponse`; +/// pass `None` otherwise (or when the request body is unavailable, e.g. 5xx). +pub fn emit_rejection_error( + context: &C, + status: u16, + response_body: &Bytes, + request: Option<(Bytes, Compression)>, + mode: RejectionReport, +) { + context.record_rejection(status, response_body); + let error_code = context.error_code(status); + let message = context.error_message(status, response_body); + let response_body_str = String::from_utf8_lossy(response_body); + + match (mode, request) { + (RejectionReport::RequestResponse, Some((body, comp))) => { + let decomp = Decompressor::from(comp); + let req_data = match decomp.decompress(body) { + Ok(data) => data, + Err(err) => format!("- decompression failed({comp}): '{err}' -").into(), + }; + error!( + message = message, + error_code = error_code, + response_status = status, + response_body = %response_body_str, + request = %String::from_utf8_lossy(&req_data), + ); + } + (RejectionReport::Stats, _) => { + error!( + message = message, + error_code = error_code, + ); + } + _ => { + // Covers `Response` mode and `RequestResponse` without a body + // (e.g. 5xx where the request payload is suppressed). + error!( + message = message, + error_code = error_code, + response_status = status, + response_body = %response_body_str, + ); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }; + + use super::*; + + #[test] + fn default_is_stats() { + assert_eq!(RejectionReport::default(), RejectionReport::Stats); + } + + #[test] + fn needs_request_is_true_only_for_request_response() { + assert!(!RejectionReport::Stats.needs_request()); + assert!(!RejectionReport::Response.needs_request()); + assert!(RejectionReport::RequestResponse.needs_request()); + } + + #[test] + fn serde_roundtrip() { + let cases: &[(&str, RejectionReport)] = &[ + (r#""stats""#, RejectionReport::Stats), + (r#""normal""#, RejectionReport::Stats), // alias + (r#""response""#, RejectionReport::Response), + (r#""request_response""#, RejectionReport::RequestResponse), + ]; + for (input, expected) in cases { + let parsed: RejectionReport = serde_json::from_str(input) + .unwrap_or_else(|_| panic!("failed to parse {input}")); + assert_eq!(&parsed, expected, "input={input}"); + } + for variant in [RejectionReport::Stats, RejectionReport::Response, RejectionReport::RequestResponse] { + let json = serde_json::to_string(&variant).unwrap(); + let back: RejectionReport = serde_json::from_str(&json).unwrap(); + assert_eq!(back, variant); + } + } + + struct MockContext { + call_count: Arc, + } + + impl RejectionContext for MockContext { + fn error_message(&self, status: u16, _body: &Bytes) -> String { + format!("test error {status}") + } + + fn record_rejection(&self, _status: u16, _body: &Bytes) { + self.call_count.fetch_add(1, Ordering::Relaxed); + } + } + + fn make_ctx() -> (MockContext, Arc) { + let count = Arc::new(AtomicU64::new(0)); + let ctx = MockContext { call_count: Arc::clone(&count) }; + (ctx, count) + } + + #[test] + fn record_rejection_called_once_per_invocation_for_every_mode() { + let body = Bytes::from("error body"); + + let (ctx, count) = make_ctx(); + emit_rejection_error(&ctx, 400, &body, None, RejectionReport::Stats); + assert_eq!(count.load(Ordering::Relaxed), 1); + + let (ctx, count) = make_ctx(); + emit_rejection_error(&ctx, 400, &body, None, RejectionReport::Response); + assert_eq!(count.load(Ordering::Relaxed), 1); + + let (ctx, count) = make_ctx(); + // RequestResponse without a request body falls back to response-only logging. + emit_rejection_error(&ctx, 400, &body, None, RejectionReport::RequestResponse); + assert_eq!(count.load(Ordering::Relaxed), 1); + + let (ctx, count) = make_ctx(); + let req = Bytes::from("request body"); + emit_rejection_error(&ctx, 400, &body, Some((req, Compression::None)), RejectionReport::RequestResponse); + assert_eq!(count.load(Ordering::Relaxed), 1); + } + + #[test] + fn request_response_mode_decompresses_uncompressed_body_without_panic() { + let (ctx, _) = make_ctx(); + let response_body = Bytes::from("response body"); + let request_body = Bytes::from("request payload"); + emit_rejection_error( + &ctx, + 400, + &response_body, + Some((request_body, Compression::None)), + RejectionReport::RequestResponse, + ); + } + + #[test] + fn error_code_default_impl_formats_status() { + let (ctx, _) = make_ctx(); + assert_eq!(ctx.error_code(400), "http_response_400"); + assert_eq!(ctx.error_code(503), "http_response_503"); + } +} diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index d313c34ce..17301e43c 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1380,6 +1380,7 @@ mod tests { auto_extract_timestamp: None, endpoint_target: Default::default(), timestamp_configuration: None, + rejection_report: Default::default(), } .build(SinkContext::default()) .await