Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ not supported by Comet will fall back to regular Spark execution.
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
| GenerateExec | Yes | Supports `explode` generator only. |
| GenerateExec | Yes | Supports `explode` and `posexplode` generators (arrays only, `_outer` variants are incompatible). |
| GlobalLimitExec | Yes | |
| HashAggregateExec | Yes | |
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |
Expand Down
30 changes: 15 additions & 15 deletions docs/source/user-guide/latest/understanding-comet-plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,21 @@ by role. Names match what is shown in the plan output.
These run natively in DataFusion. When several appear consecutively in a plan,
they execute as a single fused native block.

| Node | Spark equivalent |
| ---------------------------- | ---------------------------------------------- |
| `CometProject` | `ProjectExec` |
| `CometFilter` | `FilterExec` |
| `CometSort` | `SortExec` |
| `CometLocalLimit` | `LocalLimitExec` |
| `CometGlobalLimit` | `GlobalLimitExec` |
| `CometExpand` | `ExpandExec` |
| `CometExplode` | `GenerateExec` (for `explode` only) |
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
| `CometHashJoin` | `ShuffledHashJoinExec` |
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
| `CometSortMergeJoin` | `SortMergeJoinExec` |
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |
| Node | Spark equivalent |
| ---------------------------- | ----------------------------------------------- |
| `CometProject` | `ProjectExec` |
| `CometFilter` | `FilterExec` |
| `CometSort` | `SortExec` |
| `CometLocalLimit` | `LocalLimitExec` |
| `CometGlobalLimit` | `GlobalLimitExec` |
| `CometExpand` | `ExpandExec` |
| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`) |
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
| `CometHashJoin` | `ShuffledHashJoinExec` |
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
| `CometSortMergeJoin` | `SortMergeJoinExec` |
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |

### JVM-Side Operators

Expand Down
140 changes: 140 additions & 0 deletions native/core/src/execution/expressions/list_positions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Int32Array, ListArray, RecordBatch};
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion::common::{exec_err, Result as DataFusionResult};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;

/// A `PhysicalExpr` that takes a `List<T>` input and produces a `List<Int32>` where each row's
/// values are `[0, 1, ..., len - 1]`. Offsets and the null bitmap are inherited from the input,
/// so when the resulting list is unnested in parallel with the original list it produces the
/// `pos` column expected by Spark's `posexplode`.
#[derive(Debug, Clone)]
pub struct ListPositionsExpr {
child: Arc<dyn PhysicalExpr>,
field: FieldRef,
}

impl ListPositionsExpr {
pub fn new(child: Arc<dyn PhysicalExpr>) -> Self {
let field = Arc::new(Field::new(
"item",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
));
Self { child, field }
}
}

impl Display for ListPositionsExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "list_positions({})", self.child)
}
}

impl PartialEq for ListPositionsExpr {
fn eq(&self, other: &Self) -> bool {
self.child.eq(&other.child)
}
}

impl Eq for ListPositionsExpr {}

impl Hash for ListPositionsExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.child.hash(state);
}
}

impl PhysicalExpr for ListPositionsExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}

fn data_type(&self, _input_schema: &Schema) -> DataFusionResult<DataType> {
Ok(self.field.data_type().clone())
}

fn nullable(&self, _input_schema: &Schema) -> DataFusionResult<bool> {
Ok(true)
}

fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
let value = self.child.evaluate(batch)?;
let array = value.into_array(batch.num_rows())?;

let list = match array.as_any().downcast_ref::<ListArray>() {
Some(list) => list,
None => {
return exec_err!(
"ListPositionsExpr expected List input, got {}",
array.data_type()
);
}
};

let offsets = list.offsets();
let total_len = *offsets.last().unwrap() as usize;

let mut values: Vec<i32> = Vec::with_capacity(total_len);
for window in offsets.windows(2) {
let start = window[0];
let end = window[1];
for i in 0..(end - start) {
values.push(i);
}
}

let element_field = Arc::new(Field::new("item", DataType::Int32, true));
let result = ListArray::new(
element_field,
offsets.clone(),
Arc::new(Int32Array::from(values)),
list.nulls().cloned(),
);

Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
if children.len() != 1 {
return exec_err!(
"ListPositionsExpr expects exactly 1 child, got {}",
children.len()
);
}
Ok(Arc::new(ListPositionsExpr::new(Arc::clone(&children[0]))))
}
}
1 change: 1 addition & 0 deletions native/core/src/execution/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
pub mod arithmetic;
pub mod bitwise;
pub mod comparison;
pub mod list_positions;
pub mod logical;
pub mod nullcheck;
pub mod partition;
Expand Down
51 changes: 30 additions & 21 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod operator_registry;
use crate::execution::operators::init_csv_datasource_exec;
use crate::execution::operators::IcebergScanExec;
use crate::execution::{
expressions::list_positions::ListPositionsExpr,
expressions::subquery::Subquery,
operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec, ShuffleScanExec},
planner::expression_registry::ExpressionRegistry,
Expand Down Expand Up @@ -1643,12 +1644,8 @@ impl PhysicalPlanner {
.map(|expr| self.create_expr(expr, child.schema()))
.collect::<Result<Vec<_>, _>>()?;

// For UnnestExec, we need to add a projection to put the columns in the right order:
// 1. First add all projection columns
// 2. Then add the array column to be exploded
// Then UnnestExec will unnest the last column

// Use return_field() to get the proper column names from the expressions
// For posexplode, a parallel List<Int32> positions column is added before the
// array column so UnnestExec can unnest both in parallel.
let child_schema = child.schema();
let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = projections
.iter()
Expand All @@ -1661,34 +1658,43 @@ impl PhysicalPlanner {
})
.collect();

// Add the array column as the last column
let array_field = child_expr
.return_field(&child_schema)
.expect("Failed to get field from array expression");
let array_col_name = array_field.name().to_string();

if explode.position {
let positions_expr: Arc<dyn PhysicalExpr> =
Arc::new(ListPositionsExpr::new(Arc::clone(&child_expr)));
project_exprs.push((positions_expr, "pos".to_string()));
}
project_exprs.push((Arc::clone(&child_expr), array_col_name.clone()));

// Create a projection to arrange columns as needed
let project_exec = Arc::new(ProjectionExec::try_new(
project_exprs,
Arc::clone(&child.native_plan),
)?);

// Get the input schema from the projection
let project_schema = project_exec.schema();

// Build the output schema for UnnestExec
// The output schema replaces the list column with its element type
let mut output_fields: Vec<Field> = Vec::new();

// Add all projection columns (non-array columns)
for i in 0..projections.len() {
output_fields.push(project_schema.field(i).clone());
}

// Add the unnested array element field
let array_input_index = if explode.position {
// pos is non-nullable since outer=true is rejected at planning time.
output_fields.push(Field::new("pos", DataType::Int32, false));
projections.len() + 1
} else {
projections.len()
};

// Extract the element type from the list/array type
let array_field = project_schema.field(projections.len());
let array_field = project_schema.field(array_input_index);
let element_type = match array_field.data_type() {
DataType::List(field) => field.data_type().clone(),
dt => {
Expand All @@ -1699,8 +1705,6 @@ impl PhysicalPlanner {
}
};

// The output column has the same name as the input array column
// but with the element type instead of the list type
output_fields.push(Field::new(
array_field.name(),
element_type,
Expand All @@ -1709,12 +1713,17 @@ impl PhysicalPlanner {

let output_schema = Arc::new(Schema::new(output_fields));

// Use UnnestExec to explode the last column (the array column)
// ListUnnest specifies which column to unnest and the depth (1 for single level)
let list_unnest = ListUnnest {
index_in_input_schema: projections.len(), // Index of the array column to unnest
depth: 1, // Unnest one level (explode single array)
};
let mut list_unnests = Vec::with_capacity(2);
if explode.position {
list_unnests.push(ListUnnest {
index_in_input_schema: projections.len(),
depth: 1,
});
}
list_unnests.push(ListUnnest {
index_in_input_schema: array_input_index,
depth: 1,
});

let unnest_options = UnnestOptions {
preserve_nulls: explode.outer,
Expand All @@ -1723,7 +1732,7 @@ impl PhysicalPlanner {

let unnest_exec = Arc::new(UnnestExec::new(
project_exec,
vec![list_unnest],
list_unnests,
vec![], // No struct columns to unnest
output_schema,
unnest_options,
Expand Down
2 changes: 2 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ message Explode {
bool outer = 2;
// Expressions for other columns to project alongside the exploded values
repeated spark.spark_expression.Expr project_list = 3;
// Whether to emit a position column alongside the exploded values (posexplode)
bool position = 4;
}

message HashJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,8 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] {
if (op.generator.children.length != 1) {
return Unsupported(Some("generators with multiple inputs are not supported"))
}
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
val nodeName = op.generator.nodeName.toLowerCase(Locale.ROOT)
if (nodeName != "explode" && nodeName != "posexplode") {
return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
}
if (op.outer) {
Expand Down Expand Up @@ -1262,10 +1263,13 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] {
return None
}

val isPosExplode = op.generator.nodeName.toLowerCase(Locale.ROOT) == "posexplode"

val explodeBuilder = OperatorOuterClass.Explode
.newBuilder()
.setChild(childExprProto.get)
.setOuter(op.outer)
.setPosition(isPosExplode)
.addAllProjectList(projectExprs.map(_.get).asJava)

Some(builder.setExplode(explodeBuilder).build())
Expand Down
Loading
Loading