diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index e5372d922d..ce988cac21 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -220,3 +220,73 @@ Example log output: ``` When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations. + +### Dumping native stream output with the `debug` module + +The `native/core/src/debug/` module ships small wrappers that print every +`RecordBatch` (and, at expression granularity, every `ColumnarValue`) flowing +through a node. They are gated behind `#[cfg(debug_assertions)]` in +`native/core/src/lib.rs`, so they exist in every `cargo check` / `cargo test` +/ `make core` build but are stripped from `make release`. CI builds them on +every run, so the wrappers cannot bitrot. + +Two types are exposed: + +- `DebugExecutionDataStream` — wraps an `Arc`, prints each batch it + produces. +- `DebugExecutionDataPhyExpr` — wraps an `Arc`, prints the input + batch and output `ColumnarValue` for every `evaluate()` call. + +Output is `eprintln!` + `dbg!`, i.e. plain stderr of the executor process — the +same place `rust-gdb` / `dbg!` output lands. It is not routed through `log4rs`, +so `spark.comet.debug.enabled` and `COMET_LOG_LEVEL` do not affect it. + +#### Example: dumping window operator output + +In `PhysicalPlanner::create_plan` in `native/core/src/execution/planner.rs`, +the `OpStruct::Window` arm builds a `BoundedWindowAggExec` and hands it to +`SparkPlan::new`. Wrap the node between those two steps: + +```rust +let window_agg = Arc::new(BoundedWindowAggExec::try_new( + window_expr?, + Arc::clone(&child.native_plan), + InputOrderMode::Sorted, + !partition_exprs.is_empty(), +)?); + +#[cfg(debug_assertions)] +let window_agg: Arc = { + use crate::debug::DebugExecutionDataStream; + Arc::new(DebugExecutionDataStream::new("window-output", window_agg)) +}; + +Ok(( + scans, + shuffle_scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg, vec![child])), +)) +``` + +The `let` rebinding keeps the original `window_agg` typed concretely on release +builds (where the `cfg` block is compiled out) and re-binds it to +`Arc` when the debug module is active. No annotation is +needed on the first `let` because release builds use the original binding +directly. + +Rebuild with `make core` and run a window test, for example: + +```sh +./mvnw test -Dsuites="org.apache.comet.exec.CometWindowExecSuite window query with rangeBetween" -Dtest=none +``` + +Sample stderr excerpt (abbreviated): + +```text +[comet-debug] DebugExecutionDataStream[window-output] execute(partition=0) +[core/src/debug/debug_batch_stream.rs:30] batch = RecordBatch { columns: [...], row_count: 4 } +[core/src/debug/debug_batch_stream.rs:37] col_idx = 0 +[core/src/debug/debug_batch_stream.rs:37] column = PrimitiveArray [1, 1, 2, 2] +[core/src/debug/debug_batch_stream.rs:37] column.nulls() = None +... +``` diff --git a/native/core/src/debug/debug_batch_stream.rs b/native/core/src/debug/debug_batch_stream.rs new file mode 100644 index 0000000000..b59b6a36f7 --- /dev/null +++ b/native/core/src/debug/debug_batch_stream.rs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + +/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. +pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { + use futures::StreamExt; + let schema = stream.schema(); + let printing_stream = stream.map(|batch_result| { + match &batch_result { + Ok(batch) => { + dbg!(batch, batch.schema()); + for (col_idx, column) in batch.columns().iter().enumerate() { + dbg!(col_idx, column, column.nulls()); + } + } + Err(e) => { + println!("batch error: {:?}", e); + } + } + batch_result + }); + Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) +} + +/// `ExecutionPlan` wrapper that prints every batch produced by `inner`. +#[derive(Debug)] +pub struct DebugExecutionDataStream { + label: String, + inner: Arc, +} + +impl DebugExecutionDataStream { + pub fn new( + label: impl Into, + inner: Arc, + ) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl datafusion::physical_plan::DisplayAs for DebugExecutionDataStream { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "DebugExecutionDataStream[{}]", self.label) + } +} + +impl datafusion::physical_plan::ExecutionPlan for DebugExecutionDataStream { + fn name(&self) -> &str { + "DebugExecutionDataStream" + } + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn properties(&self) -> &Arc { + self.inner.properties() + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + Ok(Arc::new(DebugExecutionDataStream::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + eprintln!( + "[comet-debug] DebugExecutionDataStream[{}] execute(partition={})", + self.label, partition + ); + let stream = self.inner.execute(partition, context)?; + Ok(dbg_batch_stream(stream)) + } +} + +/// `PhysicalExpr` wrapper that prints every `evaluate()` call: input +/// `RecordBatch` and the resulting `ColumnarValue`. +#[derive(Debug)] +pub struct DebugExecutionDataPhyExpr { + label: String, + inner: Arc, +} + +impl DebugExecutionDataPhyExpr { + pub fn new(label: impl Into, inner: Arc) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl fmt::Display for DebugExecutionDataPhyExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "DebugExecutionDataPhyExpr[{}]({})", + self.label, self.inner + ) + } +} + +impl PartialEq for DebugExecutionDataPhyExpr { + fn eq(&self, other: &Self) -> bool { + self.label == other.label && self.inner.eq(&other.inner) + } +} +impl Eq for DebugExecutionDataPhyExpr {} +impl Hash for DebugExecutionDataPhyExpr { + fn hash(&self, state: &mut H) { + self.label.hash(state); + self.inner.hash(state); + } +} + +impl PhysicalExpr for DebugExecutionDataPhyExpr { + fn as_any(&self) -> &dyn Any { + self + } + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + fn evaluate(&self, batch: &RecordBatch) -> Result { + eprintln!( + "[comet-debug] DebugExecutionDataPhyExpr[{}].evaluate(rows={}, cols={})", + self.label, + batch.num_rows(), + batch.num_columns() + ); + dbg!(batch, batch.schema()); + let out = self.inner.evaluate(batch)?; + match &out { + ColumnarValue::Array(arr) => { + dbg!(arr.len(), arr.nulls(), arr); + } + ColumnarValue::Scalar(s) => { + dbg!(s); + } + } + Ok(out) + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(DebugExecutionDataPhyExpr::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt_sql(f) + } +} diff --git a/native/core/src/debug/mod.rs b/native/core/src/debug/mod.rs new file mode 100644 index 0000000000..84f6566eb8 --- /dev/null +++ b/native/core/src/debug/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod debug_batch_stream; + +pub use debug_batch_stream::{ + dbg_batch_stream, DebugExecutionDataPhyExpr, DebugExecutionDataStream, +}; diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index a9fdf6cdc7..7d0b6a5454 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -69,6 +69,9 @@ use errors::{try_unwrap_or_throw, CometError, CometResult}; pub mod common; pub mod execution; pub mod parquet; +// this module is for non release only. Intended for debugging/profiling purposes +#[cfg(debug_assertions)] +pub mod debug; #[cfg(all( not(target_env = "msvc"), diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 29b792e72d..ab151e3d4a 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -28,11 +28,9 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::object_store::ObjectStoreUrl; -use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_expr::expressions::{BinaryExpr, Column}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::EvalMode; @@ -217,25 +215,3 @@ fn get_options( (table_parquet_options, spark_parquet_options) } - -/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. -/// Returns a new `SendableRecordBatchStream` that yields the same batches. -pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { - use futures::StreamExt; - let schema = stream.schema(); - let printing_stream = stream.map(|batch_result| { - match &batch_result { - Ok(batch) => { - dbg!(batch, batch.schema()); - for (col_idx, column) in batch.columns().iter().enumerate() { - dbg!(col_idx, column, column.nulls()); - } - } - Err(e) => { - println!("batch error: {:?}", e); - } - } - batch_result - }); - Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) -}