From 224229314b0e2ad4ce0951d4e7226852fe403114 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 8 May 2026 17:25:28 -0600 Subject: [PATCH 1/2] feat: add support for `posexplode` and `posexplode_outer` Extend `CometExplodeExec` to accept both `Explode` and `PosExplode` Catalyst nodes. The Explode operator proto carries a new `position` flag; when set, the native planner builds a parallel `List` positions column with `ListPositionsExpr` and unnests it alongside the array column via DataFusion's `UnnestExec`. Maps still fall back (#2837) and `outer=true` keeps its Incompatible status (DataFusion #19053). Closes #4269. --- docs/source/user-guide/latest/operators.md | 2 +- .../latest/understanding-comet-plans.md | 2 +- .../execution/expressions/list_positions.rs | 140 ++++++++++++++++ native/core/src/execution/expressions/mod.rs | 1 + native/core/src/execution/planner.rs | 51 +++--- native/proto/src/proto/operator.proto | 2 + .../apache/spark/sql/comet/operators.scala | 6 +- .../comet/exec/CometGenerateExecSuite.scala | 154 ++++++++++++++++++ 8 files changed, 334 insertions(+), 24 deletions(-) create mode 100644 native/core/src/execution/expressions/list_positions.rs diff --git a/docs/source/user-guide/latest/operators.md b/docs/source/user-guide/latest/operators.md index 1b8f78d9c6..495141f9bb 100644 --- a/docs/source/user-guide/latest/operators.md +++ b/docs/source/user-guide/latest/operators.md @@ -30,7 +30,7 @@ not supported by Comet will fall back to regular Spark execution. | ExpandExec | Yes | | | FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. | | FilterExec | Yes | | -| GenerateExec | Yes | Supports `explode` generator only. | +| GenerateExec | Yes | Supports `explode` and `posexplode` generators (arrays only, `_outer` variants are incompatible). | | GlobalLimitExec | Yes | | | HashAggregateExec | Yes | | | InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. | diff --git a/docs/source/user-guide/latest/understanding-comet-plans.md b/docs/source/user-guide/latest/understanding-comet-plans.md index f66f385207..31bdc3019a 100644 --- a/docs/source/user-guide/latest/understanding-comet-plans.md +++ b/docs/source/user-guide/latest/understanding-comet-plans.md @@ -166,7 +166,7 @@ they execute as a single fused native block. | `CometLocalLimit` | `LocalLimitExec` | | `CometGlobalLimit` | `GlobalLimitExec` | | `CometExpand` | `ExpandExec` | -| `CometExplode` | `GenerateExec` (for `explode` only) | +| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`)| | `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` | | `CometHashJoin` | `ShuffledHashJoinExec` | | `CometBroadcastHashJoin` | `BroadcastHashJoinExec` | diff --git a/native/core/src/execution/expressions/list_positions.rs b/native/core/src/execution/expressions/list_positions.rs new file mode 100644 index 0000000000..117aab4d97 --- /dev/null +++ b/native/core/src/execution/expressions/list_positions.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, Int32Array, ListArray, RecordBatch}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema}; +use datafusion::common::{exec_err, Result as DataFusionResult}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ColumnarValue; + +/// A `PhysicalExpr` that takes a `List` input and produces a `List` where each row's +/// values are `[0, 1, ..., len - 1]`. Offsets and the null bitmap are inherited from the input, +/// so when the resulting list is unnested in parallel with the original list it produces the +/// `pos` column expected by Spark's `posexplode`. +#[derive(Debug, Clone)] +pub struct ListPositionsExpr { + child: Arc, + field: FieldRef, +} + +impl ListPositionsExpr { + pub fn new(child: Arc) -> Self { + let field = Arc::new(Field::new( + "item", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + )); + Self { child, field } + } +} + +impl Display for ListPositionsExpr { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "list_positions({})", self.child) + } +} + +impl PartialEq for ListPositionsExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + } +} + +impl Eq for ListPositionsExpr {} + +impl Hash for ListPositionsExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + } +} + +impl PhysicalExpr for ListPositionsExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(self, f) + } + + fn data_type(&self, _input_schema: &Schema) -> DataFusionResult { + Ok(self.field.data_type().clone()) + } + + fn nullable(&self, _input_schema: &Schema) -> DataFusionResult { + Ok(true) + } + + fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { + let value = self.child.evaluate(batch)?; + let array = value.into_array(batch.num_rows())?; + + let list = match array.as_any().downcast_ref::() { + Some(list) => list, + None => { + return exec_err!( + "ListPositionsExpr expected List input, got {}", + array.data_type() + ); + } + }; + + let offsets = list.offsets(); + let total_len = *offsets.last().unwrap() as usize; + + let mut values: Vec = Vec::with_capacity(total_len); + for window in offsets.windows(2) { + let start = window[0]; + let end = window[1]; + for i in 0..(end - start) { + values.push(i); + } + } + + let element_field = Arc::new(Field::new("item", DataType::Int32, true)); + let result = ListArray::new( + element_field, + offsets.clone(), + Arc::new(Int32Array::from(values)), + list.nulls().cloned(), + ); + + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + if children.len() != 1 { + return exec_err!( + "ListPositionsExpr expects exactly 1 child, got {}", + children.len() + ); + } + Ok(Arc::new(ListPositionsExpr::new(Arc::clone(&children[0])))) + } +} diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index c2b144b7dd..e174bd3747 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -20,6 +20,7 @@ pub mod arithmetic; pub mod bitwise; pub mod comparison; +pub mod list_positions; pub mod logical; pub mod nullcheck; pub mod partition; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 478c7a8d98..9efed51f6e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -24,6 +24,7 @@ pub mod operator_registry; use crate::execution::operators::init_csv_datasource_exec; use crate::execution::operators::IcebergScanExec; use crate::execution::{ + expressions::list_positions::ListPositionsExpr, expressions::subquery::Subquery, operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec, ShuffleScanExec}, planner::expression_registry::ExpressionRegistry, @@ -1643,12 +1644,8 @@ impl PhysicalPlanner { .map(|expr| self.create_expr(expr, child.schema())) .collect::, _>>()?; - // For UnnestExec, we need to add a projection to put the columns in the right order: - // 1. First add all projection columns - // 2. Then add the array column to be exploded - // Then UnnestExec will unnest the last column - - // Use return_field() to get the proper column names from the expressions + // For posexplode, a parallel List positions column is added before the + // array column so UnnestExec can unnest both in parallel. let child_schema = child.schema(); let mut project_exprs: Vec<(Arc, String)> = projections .iter() @@ -1661,24 +1658,26 @@ impl PhysicalPlanner { }) .collect(); - // Add the array column as the last column let array_field = child_expr .return_field(&child_schema) .expect("Failed to get field from array expression"); let array_col_name = array_field.name().to_string(); + + if explode.position { + let positions_expr: Arc = + Arc::new(ListPositionsExpr::new(Arc::clone(&child_expr))); + project_exprs.push((positions_expr, "pos".to_string())); + } project_exprs.push((Arc::clone(&child_expr), array_col_name.clone())); - // Create a projection to arrange columns as needed let project_exec = Arc::new(ProjectionExec::try_new( project_exprs, Arc::clone(&child.native_plan), )?); - // Get the input schema from the projection let project_schema = project_exec.schema(); // Build the output schema for UnnestExec - // The output schema replaces the list column with its element type let mut output_fields: Vec = Vec::new(); // Add all projection columns (non-array columns) @@ -1686,9 +1685,16 @@ impl PhysicalPlanner { output_fields.push(project_schema.field(i).clone()); } - // Add the unnested array element field + let array_input_index = if explode.position { + // pos is non-nullable since outer=true is rejected at planning time. + output_fields.push(Field::new("pos", DataType::Int32, false)); + projections.len() + 1 + } else { + projections.len() + }; + // Extract the element type from the list/array type - let array_field = project_schema.field(projections.len()); + let array_field = project_schema.field(array_input_index); let element_type = match array_field.data_type() { DataType::List(field) => field.data_type().clone(), dt => { @@ -1699,8 +1705,6 @@ impl PhysicalPlanner { } }; - // The output column has the same name as the input array column - // but with the element type instead of the list type output_fields.push(Field::new( array_field.name(), element_type, @@ -1709,12 +1713,17 @@ impl PhysicalPlanner { let output_schema = Arc::new(Schema::new(output_fields)); - // Use UnnestExec to explode the last column (the array column) - // ListUnnest specifies which column to unnest and the depth (1 for single level) - let list_unnest = ListUnnest { - index_in_input_schema: projections.len(), // Index of the array column to unnest - depth: 1, // Unnest one level (explode single array) - }; + let mut list_unnests = Vec::with_capacity(2); + if explode.position { + list_unnests.push(ListUnnest { + index_in_input_schema: projections.len(), + depth: 1, + }); + } + list_unnests.push(ListUnnest { + index_in_input_schema: array_input_index, + depth: 1, + }); let unnest_options = UnnestOptions { preserve_nulls: explode.outer, @@ -1723,7 +1732,7 @@ impl PhysicalPlanner { let unnest_exec = Arc::new(UnnestExec::new( project_exec, - vec![list_unnest], + list_unnests, vec![], // No struct columns to unnest output_schema, unnest_options, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 7cefe06da7..68be3beb07 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -355,6 +355,8 @@ message Explode { bool outer = 2; // Expressions for other columns to project alongside the exploded values repeated spark.spark_expression.Expr project_list = 3; + // Whether to emit a position column alongside the exploded values (posexplode) + bool position = 4; } message HashJoin { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index f315aae6e2..d71c8e10ba 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1218,7 +1218,8 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { if (op.generator.children.length != 1) { return Unsupported(Some("generators with multiple inputs are not supported")) } - if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { + val nodeName = op.generator.nodeName.toLowerCase(Locale.ROOT) + if (nodeName != "explode" && nodeName != "posexplode") { return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) } if (op.outer) { @@ -1262,10 +1263,13 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { return None } + val isPosExplode = op.generator.nodeName.toLowerCase(Locale.ROOT) == "posexplode" + val explodeBuilder = OperatorOuterClass.Explode .newBuilder() .setChild(childExprProto.get) .setOuter(op.outer) + .setPosition(isPosExplode) .addAllProjectList(projectExprs.map(_.get).asJava) Some(builder.setExplode(explodeBuilder).build()) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala index a9ac3deb34..d03522afe1 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -232,4 +232,158 @@ class CometGenerateExecSuite extends CometTestBase { } } + test("posexplode with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(10, 20, 30)), (2, Array(40, 50)), (3, Array(60))) + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode_outer with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(10, 20, 30)), (2, Array(40, 50)), (3, Array(60))) + .toDF("id", "arr") + .selectExpr("id", "posexplode_outer(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode with array of strings") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array("a", "b", "c")), (2, Array("d", "e")), (3, Array("f"))) + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode with nullable elements") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Array[Option[Int]](Some(1), None, Some(3))), + (2, Array[Option[Int]](None, Some(5))), + (3, Array[Option[Int]](Some(6)))) + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode with multiple projected columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = + Seq((1, "A", Array(10, 20, 30)), (2, "B", Array(40, 50)), (3, "C", Array(60))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode with map input falls back") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Map("a" -> 1, "b" -> 2)), (2, Map("c" -> 3))) + .toDF("id", "map") + .selectExpr("id", "posexplode(map) as (pos, key, value)") + checkSparkAnswerAndFallbackReason( + df, + "Comet only supports explode/explode_outer for arrays, not maps") + } + } + + test("posexplode with array of structs") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Array((10, "a"), (20, "b"))), + (2, Array((30, "c"))), + (3, Array.empty[(Int, String)])) + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + .selectExpr("id", "pos", "value._1 as v1", "value._2 as v2") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode in lateral view") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + withTempView("t") { + Seq((1, Array(10, 20, 30)), (2, Array(40, 50)), (3, Array(60))) + .toDF("id", "arr") + .createOrReplaceTempView("t") + val df = + sql("SELECT t.id, p.pos, p.col FROM t LATERAL VIEW posexplode(t.arr) p AS pos, col") + checkSparkAnswerAndOperator(df) + } + } + } + + test("posexplode of literal array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq(1, 2, 3) + .toDF("id") + .selectExpr("id", "posexplode(array(100, 200, 300)) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + + test("posexplode across batch boundary with small batch size") { + // Force ScanExec to emit multiple small batches so that UnnestExec sees the parallel + // positions/values lists across batch boundaries. Element values are non-trivial so wrong + // alignment between pos and value would be visible in the answer. + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true", + CometConf.COMET_BATCH_SIZE.key -> "4") { + val rows = (1 to 12).map { i => + (i, (0 until (i % 5 + 1)).map(j => i * 100 + j).toArray) + } + val df = rows + .toDF("id", "arr") + .selectExpr("id", "posexplode(arr) as (pos, value)") + checkSparkAnswerAndOperator(df) + } + } + } From 84b9814ca7775d8e5f397434f0d6b0d773bab8a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 8 May 2026 17:40:50 -0600 Subject: [PATCH 2/2] style: prettier table alignment in understanding-comet-plans.md --- .../latest/understanding-comet-plans.md | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/source/user-guide/latest/understanding-comet-plans.md b/docs/source/user-guide/latest/understanding-comet-plans.md index 31bdc3019a..791f4bf766 100644 --- a/docs/source/user-guide/latest/understanding-comet-plans.md +++ b/docs/source/user-guide/latest/understanding-comet-plans.md @@ -158,21 +158,21 @@ by role. Names match what is shown in the plan output. These run natively in DataFusion. When several appear consecutively in a plan, they execute as a single fused native block. -| Node | Spark equivalent | -| ---------------------------- | ---------------------------------------------- | -| `CometProject` | `ProjectExec` | -| `CometFilter` | `FilterExec` | -| `CometSort` | `SortExec` | -| `CometLocalLimit` | `LocalLimitExec` | -| `CometGlobalLimit` | `GlobalLimitExec` | -| `CometExpand` | `ExpandExec` | -| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`)| -| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` | -| `CometHashJoin` | `ShuffledHashJoinExec` | -| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` | -| `CometSortMergeJoin` | `SortMergeJoinExec` | -| `CometWindow` | `WindowExec` | -| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` | +| Node | Spark equivalent | +| ---------------------------- | ----------------------------------------------- | +| `CometProject` | `ProjectExec` | +| `CometFilter` | `FilterExec` | +| `CometSort` | `SortExec` | +| `CometLocalLimit` | `LocalLimitExec` | +| `CometGlobalLimit` | `GlobalLimitExec` | +| `CometExpand` | `ExpandExec` | +| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`) | +| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` | +| `CometHashJoin` | `ShuffledHashJoinExec` | +| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` | +| `CometSortMergeJoin` | `SortMergeJoinExec` | +| `CometWindow` | `WindowExec` | +| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` | ### JVM-Side Operators