Skip to content
5 changes: 3 additions & 2 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
elasticsearch::{
health::ElasticsearchHealthLogic,
retry::ElasticsearchRetryLogic,
service::{ElasticsearchService, HttpRequestBuilder, Telemetry},
service::{ElasticsearchRejectionContext, ElasticsearchService, HttpRequestBuilder, Telemetry},
sink::ElasticsearchSink,
ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
ElasticsearchCommonMode, ElasticsearchMode, RejectionReport, VersionType,
Expand Down Expand Up @@ -573,14 +573,15 @@ impl SinkConfig for ElasticsearchConfig {
"end_pt" => endpt_str,
),
};
let context = std::sync::Arc::new(ElasticsearchRejectionContext { telemetry });

let http_request_builder = HttpRequestBuilder::new(&common, self);
let service = ElasticsearchService::new(
client.clone(),
http_request_builder,
self.rejection_report.clone(),
self.compression.clone(),
telemetry);
context);

(endpoint, service)
})
Expand Down
32 changes: 1 addition & 31 deletions src/sinks/elasticsearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,37 +77,7 @@ impl Default for ElasticsearchMode {
}
}

/// Elasticsearch rejection reporting mode.
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub enum RejectionReport {
/// Do not print rejection response, only increment stats
#[serde(alias = "normal")]
Stats,

/// Report entire response, the response may be large and likely not too useful
Response,

/// Report entire request and response may be huge, with smaller batch-sizes this
/// can be useful to debug issues
RequestResponse,
}

impl Default for RejectionReport {
fn default() -> Self {
Self::Stats
}
}

impl RejectionReport {
pub fn needs_request(&self) -> bool {
match self {
Self::RequestResponse => true,
_ => false,
}
}
}
pub use crate::sinks::util::RejectionReport;

/// Bulk API actions.
#[configurable_component]
Expand Down
110 changes: 43 additions & 67 deletions src/sinks/elasticsearch/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
sinks::util::{
auth::Auth,
http::{HttpBatchService, RequestConfig},
Compression, ElementCount, Decompressor,
Compression, ElementCount, emit_rejection_error, RejectionContext,
},
};

Expand Down Expand Up @@ -77,6 +77,26 @@ pub struct Telemetry {
pub indexed: Counter,
}

pub struct ElasticsearchRejectionContext {
pub telemetry: Telemetry,
}

impl RejectionContext for ElasticsearchRejectionContext {
fn error_code(&self, status: u16) -> String {
format!("http_response_{status}")
}

fn error_message(&self, status: u16, body: &Bytes) -> String {
err_summary(status, body).msg
}

fn record_rejection(&self, status: u16, body: &Bytes) {
let s = err_summary(status, body);
self.telemetry.rejected.increment(s.rejected);
self.telemetry.indexed.increment(s.indexed);
}
}

#[derive(Clone)]
pub struct ElasticsearchService {
// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
Expand All @@ -88,7 +108,7 @@ pub struct ElasticsearchService {
>,
rej_rpt: RejectionReport,
compression: Compression,
telemetry: Telemetry,
context: Arc<ElasticsearchRejectionContext>,
}

impl ElasticsearchService {
Expand All @@ -97,7 +117,7 @@ impl ElasticsearchService {
http_request_builder: HttpRequestBuilder,
rej_rpt: RejectionReport,
compression: Compression,
telemetry: Telemetry,
context: Arc<ElasticsearchRejectionContext>,
) -> ElasticsearchService {
let http_request_builder = Arc::new(http_request_builder);
let batch_service = HttpBatchService::new(http_client, move |req| {
Expand All @@ -106,7 +126,7 @@ impl ElasticsearchService {
Box::pin(async move { request_builder.build_request(req).await });
future
});
ElasticsearchService { batch_service, rej_rpt, compression, telemetry }
ElasticsearchService { batch_service, rej_rpt, compression, context }
}
}

Expand Down Expand Up @@ -213,14 +233,14 @@ impl Service<ElasticsearchRequest> for ElasticsearchService {
} else {
None
};
let telemetry = self.telemetry.clone();
let context = Arc::clone(&self.context);
Box::pin(async move {
http_service.ready().await?;
let events_byte_size =
std::mem::take(req.metadata_mut()).into_events_estimated_json_encoded_byte_size();
let http_response = http_service.call(req).await?;

let event_status = get_event_status(&http_response, req_for_rpt, rej_rpt, telemetry);
let event_status = get_event_status(&http_response, req_for_rpt, rej_rpt, &context);
Ok(ElasticsearchResponse {
event_status,
http_response,
Expand All @@ -230,8 +250,6 @@ impl Service<ElasticsearchRequest> for ElasticsearchService {
}
}

const ES_REJ_RPT: &str = "es_rej_rpt";

fn response_frag(key: &str, val_prefix: &str) -> String {
format!("\"{key}\":{val_prefix}")
}
Expand All @@ -244,97 +262,55 @@ struct ErrSummary {
rejected: u64,
}

fn err_summary(response: &Response<Bytes>) -> ErrSummary {
let body = String::from_utf8_lossy(response.body());
let i =
body
fn err_summary(status: u16, body: &Bytes) -> ErrSummary {
let body_str = String::from_utf8_lossy(body);
let i: u64 =
body_str
.match_indices(response_frag("status", "201").as_str())
.count()
.try_into()
.unwrap();
let r =
body
let r: u64 =
body_str
.match_indices(response_frag("status", "400").as_str())
.count()
.try_into()
.unwrap();
ErrSummary {
error_code: format!("http_response_{}", response.status().as_u16()),
error_code: format!("http_response_{status}"),
msg: format!("Request contained errors (indexed: {i}, rejected: {r})."),
indexed: i,
rejected: r
}
}

fn emit_bad_response_error(
response: &Response<Bytes>,
request: Option<(ElasticsearchRequest, Compression)>,
rej_rpt: RejectionReport,
telemetry: Telemetry,
) {
let err_summary = err_summary(response);
telemetry.indexed.increment(err_summary.indexed);
telemetry.rejected.increment(err_summary.rejected);

match (rej_rpt, request) {
(RejectionReport::RequestResponse, Some((req, comp))) => {
let decomp = Decompressor::from(comp);
let req_data = match decomp.decompress(req.payload) {
Ok(data) => data,
Err(err) => format!("- decompression failed({comp}): '{err}' -").into()
};

error!(
category = ES_REJ_RPT,
message = err_summary.msg,
error_code = err_summary.error_code,
response = ?response,
request = %String::from_utf8_lossy(&req_data),
);
}
(RejectionReport::Stats, _) => {
error!(
category = ES_REJ_RPT,
message = err_summary.msg,
error_code = err_summary.error_code,
);
}
_ => {
error!(
category = ES_REJ_RPT,
message = err_summary.msg,
error_code = err_summary.error_code,
response = ?response,
);
}
};
}

fn get_event_status(
response: &Response<Bytes>,
request: Option<(ElasticsearchRequest, Compression)>,
rej_rpt: RejectionReport,
telemetry: Telemetry,
context: &ElasticsearchRejectionContext,
) -> EventStatus {
let status = response.status();
if status.is_success() {
let body = String::from_utf8_lossy(response.body());
if body.contains(response_frag("errors", "true").as_str()) {
emit_bad_response_error(response, request, rej_rpt, telemetry);
let body = response.body();
if String::from_utf8_lossy(body).contains(response_frag("errors", "true").as_str()) {
let req = request.map(|(req, comp)| (req.payload, comp));
emit_rejection_error(context, status.as_u16(), body, req, rej_rpt);
EventStatus::Rejected
} else {
EventStatus::Delivered
}
} else if status.is_server_error() {
let rej_rpt = if rej_rpt == RejectionReport::RequestResponse {
let mode = if rej_rpt == RejectionReport::RequestResponse {
RejectionReport::Response
} else {
rej_rpt
};
emit_bad_response_error(response, None, rej_rpt, telemetry);
emit_rejection_error(context, status.as_u16(), response.body(), None, mode);
EventStatus::Errored
} else {
emit_bad_response_error(response, request, rej_rpt, telemetry);
let req = request.map(|(req, comp)| (req.payload, comp));
emit_rejection_error(context, status.as_u16(), response.body(), req, rej_rpt);
EventStatus::Rejected
}
}
Expand Down Expand Up @@ -366,7 +342,7 @@ mod tests {
assert!(body.contains(response_frag("errors", "true").as_str()));

assert_eq!(
err_summary(&res),
err_summary(res.status().as_u16(), res.body()),
ErrSummary {
error_code: "http_response_200".into(),
msg: "Request contained errors (indexed: 259, rejected: 5).".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl HumioLogsConfig {
preserve_timestamp_key: false,
}),
batch_headers: BatchHeaders::default(),
rejection_report: Default::default(),
}
}
}
Expand Down
Loading