Skip to content

feat: add GroupsAccumulator for variance, stddev, covariance, correlation#4254

Open
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:groups-accumulator-stats
Open

feat: add GroupsAccumulator for variance, stddev, covariance, correlation#4254
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:groups-accumulator-stats

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 6, 2026

Which issue does this PR close?

Closes #1627
Closes #4249

Rationale for this change

Expression Before (ms) After (ms) Speedup Relative vs Spark (before → after)
var_samp 41 21 1.95x 1.6x → 3.1x
var_pop 41 21 1.95x 1.6x → 3.2x
stddev_samp 40 21 1.90x 1.6x → 3.2x
stddev_pop 40 21 1.90x 1.6x → 3.1x
covar_samp 93 26 3.58x 0.8x → 2.8x
covar_pop 92 25 3.68x 0.8x → 2.9x
corr 105 32 3.28x 0.7x → 2.5x

Detailed benchmark results are posted in a comment on this PR.

DataFusion's grouped aggregate operator has a fast path that uses
GroupsAccumulator instead of one Accumulator per group. Comet's
existing variance, stddev, covariance, and correlation aggregates only
implement the per-row Accumulator trait, so grouped queries fall back
to a slower row-at-a-time path. Issue #1275 measured a roughly 4x
speedup from applying the same change to avg. After this PR the four
remaining Comet-owned aggregates are on parity with avg / sum_int
/ sum_decimal / avg_decimal, which already implement the fast path.

What changes are included in this PR?

  • New agg_funcs/welford.rs with the Welford update/merge math used by
    both the per-row and grouped paths. The existing VarianceAccumulator
    and CovarianceAccumulator per-row methods now route through these
    helpers (no behavior change).
  • New VarianceGroupsAccumulator, StddevGroupsAccumulator,
    CovarianceGroupsAccumulator, and CorrelationGroupsAccumulator.
    Each AggregateUDFImpl now returns true from
    groups_accumulator_supported and constructs the matching grouped
    accumulator. Stddev wraps variance; correlation wraps one covariance
    plus two variance group accumulators, mirroring the existing
    Accumulator composition.
  • The grouped accumulators preserve Spark wire format (f64 counts) and
    Spark semantics (null_on_divide_by_zero for the count == 1 branches).

How are these changes tested?

  • New Rust unit tests cover single-group correctness, multi-group
    correctness, null handling on each input column (correlation needs
    both columns non-null per row), opt_filter masking, multi-batch
    merge equals single-shot, and the empty-group-yields-null and
    single-row-sample edge cases.
  • The grouped paths are picked up automatically by DataFusion's hash
    aggregate, so existing JVM coverage in CometAggregateSuite exercises
    the new code end-to-end. CI runs the full sweep across Spark 3.4,
    3.5, and 4.0.

This PR was scaffolded with the project's superpowers:brainstorming and
superpowers:writing-plans skills.

@andygrove
Copy link
Copy Markdown
Member Author

andygrove commented May 6, 2026

before

Running benchmark: var_samp
  Running case: Spark
  Stopped after 30 iterations, 2044 ms
  Running case: Comet (Scan)
  Stopped after 40 iterations, 2045 ms
  Running case: Comet (Scan + Exec)
  Stopped after 48 iterations, 2041 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
var_samp:                                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                66             68           4         15.8          63.2       1.0X
Comet (Scan)                                         48             51           2         21.6          46.2       1.4X
Comet (Scan + Exec)                                  41             43           3         25.7          38.9       1.6X

Running benchmark: var_pop
  Running case: Spark
  Stopped after 31 iterations, 2054 ms
  Running case: Comet (Scan)
  Stopped after 40 iterations, 2007 ms
  Running case: Comet (Scan + Exec)
  Stopped after 49 iterations, 2029 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
var_pop:                                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                64             66           1         16.3          61.3       1.0X
Comet (Scan)                                         48             50           2         21.6          46.2       1.3X
Comet (Scan + Exec)                                  41             41           1         25.9          38.6       1.6X

Running benchmark: stddev_samp
  Running case: Spark
  Stopped after 31 iterations, 2043 ms
  Running case: Comet (Scan)
  Stopped after 41 iterations, 2009 ms
  Running case: Comet (Scan + Exec)
  Stopped after 49 iterations, 2030 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
stddev_samp:                              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                63             66           1         16.6          60.4       1.0X
Comet (Scan)                                         48             49           2         22.1          45.3       1.3X
Comet (Scan + Exec)                                  40             41           2         26.0          38.4       1.6X

Running benchmark: stddev_pop
  Running case: Spark
  Stopped after 30 iterations, 2024 ms
  Running case: Comet (Scan)
  Stopped after 40 iterations, 2041 ms
  Running case: Comet (Scan + Exec)
  Stopped after 49 iterations, 2019 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
stddev_pop:                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                65             67           3         16.1          62.0       1.0X
Comet (Scan)                                         48             51           2         21.7          46.1       1.3X
Comet (Scan + Exec)                                  40             41           1         26.0          38.5       1.6X

Running benchmark: covar_samp
  Running case: Spark
  Stopped after 27 iterations, 2015 ms
  Running case: Comet (Scan)
  Stopped after 34 iterations, 2022 ms
  Running case: Comet (Scan + Exec)
  Stopped after 22 iterations, 2074 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
covar_samp:                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                73             75           2         14.4          69.4       1.0X
Comet (Scan)                                         56             59           3         18.6          53.7       1.3X
Comet (Scan + Exec)                                  93             94           2         11.3          88.4       0.8X

Running benchmark: covar_pop
  Running case: Spark
  Stopped after 27 iterations, 2001 ms
  Running case: Comet (Scan)
  Stopped after 35 iterations, 2019 ms
  Running case: Comet (Scan + Exec)
  Stopped after 21 iterations, 2005 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
covar_pop:                                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                73             74           2         14.4          69.2       1.0X
Comet (Scan)                                         56             58           2         18.8          53.3       1.3X
Comet (Scan + Exec)                                  92             96           7         11.4          87.4       0.8X

Running benchmark: corr
  Running case: Spark
  Stopped after 25 iterations, 2018 ms
  Running case: Comet (Scan)
  Stopped after 31 iterations, 2049 ms
  Running case: Comet (Scan + Exec)
  Stopped after 19 iterations, 2060 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
corr:                                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                78             81           2         13.4          74.6       1.0X
Comet (Scan)                                         64             66           1         16.4          61.0       1.2X
Comet (Scan + Exec)                                 105            108           3         10.0         100.4       0.7X

after

Running benchmark: var_samp
  Running case: Spark
  Stopped after 30 iterations, 2045 ms
  Running case: Comet (Scan)
  Stopped after 40 iterations, 2035 ms
  Running case: Comet (Scan + Exec)
  Stopped after 90 iterations, 2012 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
var_samp:                                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                66             68           2         15.9          62.8       1.0X
Comet (Scan)                                         49             51           2         21.5          46.5       1.4X
Comet (Scan + Exec)                                  21             22           1         49.9          20.0       3.1X

Running benchmark: var_pop
  Running case: Spark
  Stopped after 30 iterations, 2010 ms
  Running case: Comet (Scan)
  Stopped after 41 iterations, 2033 ms
  Running case: Comet (Scan + Exec)
  Stopped after 93 iterations, 2005 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
var_pop:                                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                65             67           2         16.0          62.5       1.0X
Comet (Scan)                                         48             50           1         21.8          45.9       1.4X
Comet (Scan + Exec)                                  21             22           1         50.6          19.8       3.2X

Running benchmark: stddev_samp
  Running case: Spark
  Stopped after 30 iterations, 2034 ms
  Running case: Comet (Scan)
  Stopped after 40 iterations, 2035 ms
  Running case: Comet (Scan + Exec)
  Stopped after 93 iterations, 2017 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
stddev_samp:                              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                65             68           1         16.0          62.3       1.0X
Comet (Scan)                                         49             51           1         21.2          47.1       1.3X
Comet (Scan + Exec)                                  21             22           1         50.8          19.7       3.2X

Running benchmark: stddev_pop
  Running case: Spark
  Stopped after 31 iterations, 2056 ms
  Running case: Comet (Scan)
  Stopped after 40 iterations, 2003 ms
  Running case: Comet (Scan + Exec)
  Stopped after 93 iterations, 2003 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
stddev_pop:                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                64             66           1         16.4          60.9       1.0X
Comet (Scan)                                         48             50           1         21.6          46.2       1.3X
Comet (Scan + Exec)                                  21             22           1         50.6          19.8       3.1X

Running benchmark: covar_samp
  Running case: Spark
  Stopped after 27 iterations, 2022 ms
  Running case: Comet (Scan)
  Stopped after 34 iterations, 2016 ms
  Running case: Comet (Scan + Exec)
  Stopped after 75 iterations, 2004 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
covar_samp:                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                73             75           2         14.4          69.6       1.0X
Comet (Scan)                                         56             59           2         18.6          53.7       1.3X
Comet (Scan + Exec)                                  26             27           1         40.9          24.5       2.8X

Running benchmark: covar_pop
  Running case: Spark
  Stopped after 27 iterations, 2018 ms
  Running case: Comet (Scan)
  Stopped after 35 iterations, 2027 ms
  Running case: Comet (Scan + Exec)
  Stopped after 76 iterations, 2021 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
covar_pop:                                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                73             75           2         14.4          69.5       1.0X
Comet (Scan)                                         56             58           2         18.7          53.5       1.3X
Comet (Scan + Exec)                                  25             27           2         41.1          24.3       2.9X

Running benchmark: corr
  Running case: Spark
  Stopped after 25 iterations, 2067 ms
  Running case: Comet (Scan)
  Stopped after 31 iterations, 2053 ms
  Running case: Comet (Scan + Exec)
  Stopped after 61 iterations, 2000 ms

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 26.3.1
Apple M3 Ultra
corr:                                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark                                                80             83           5         13.1          76.1       1.0X
Comet (Scan)                                         64             66           2         16.4          61.0       1.2X
Comet (Scan + Exec)                                  32             33           1         32.6          30.7       2.5X

@comphead
Copy link
Copy Markdown
Contributor

comphead commented May 6, 2026

DataFusion's grouped aggregate operator has a fast path that uses
GroupsAccumulator instead of one Accumulator per group

Should we try to wire DF aggregation instead of Comet implementation? or there are any Spark inconsistencies?

@andygrove
Copy link
Copy Markdown
Member Author

DataFusion's grouped aggregate operator has a fast path that uses
GroupsAccumulator instead of one Accumulator per group

Should we try to wire DF aggregation instead of Comet implementation? or there are any Spark inconsistencies?

They are not compatible the last time I checked - signed vs unsigned types, IIRC

@andygrove
Copy link
Copy Markdown
Member Author

DataFusion's grouped aggregate operator has a fast path that uses
GroupsAccumulator instead of one Accumulator per group

Should we try to wire DF aggregation instead of Comet implementation? or there are any Spark inconsistencies?

They are not compatible the last time I checked - signed vs unsigned types, IIRC

would be good to upstream these to datafusion-spark and then see if they could share common code with the core impl

@mbutrovich
Copy link
Copy Markdown
Contributor

Thanks for picking this up, @andygrove! The speedups are really nice to see and the refactor into welford.rs makes the per-row and grouped paths a lot easier to reason about together. A few things I wanted to ask about while reading through.

Correlation update_batch mask

In CorrelationGroupsAccumulator::update_batch we build is_not_null(a) & is_not_null(b) on every batch and AND it with opt_filter, then hand the combined mask to all three children. The per-row CorrelationAccumulator has a values[0].null_count() != 0 || values[1].null_count() != 0 short-circuit that skips the whole thing when both columns are dense. Could we add the same fast path here? Otherwise each child's hot loop hits the opt_filter branch for every row even when the input was fully non-null, and the allocation is paid regardless.

Correlation evaluate doing three finalizes

CorrelationGroupsAccumulator::evaluate calls evaluate on covar, var1, and var2, each of which allocates a Float64Array and runs its own count==0 / sample divide-by-zero branch. We then unpack all three back to primitives and largely discard the metadata. Since correlation already reaches into covar.counts(), would it be cleaner and cheaper to iterate the raw counts, m2s, and algo_consts directly once? That would also remove the need for the pub(crate) counts() accessor, which currently exists only so correlation can snapshot counts before the children's evaluate drains their state.

finalize duplication between variance and covariance

VarianceGroupsAccumulator::finalize and CovarianceGroupsAccumulator::finalize look structurally identical, with the only difference being m2s vs algo_consts as the numerator. Would it be worth hoisting the shared body into something like welford::finalize_moments(counts, numerators, stats_type, null_on_divide_by_zero)? Feels like it would drop ~70 lines and keep the Spark sample-NaN vs null rule in one place.

Stddev evaluate allocation

In StddevGroupsAccumulator::evaluate we do values.iter().map(|v| v.sqrt()).collect::<Vec<_>>() and rebuild a Float64Array. Arrow's PrimitiveArray::unary_mut or unary would run the sqrt in place on the buffer we already produced, avoiding the intermediate Vec<f64>. Not a huge win per emit but removes one allocation per output batch.

Covariance argument count

welford::covariance_merge takes 8 f64s and needs #[allow(clippy::too_many_arguments)], and covariance_update / covariance_retract each take 6. I wonder if a small CovarianceMoments { count, mean1, mean2, c } struct would read better? It would also let CovarianceGroupsAccumulator collapse its four parallel Vec<f64>s into a single Vec<CovarianceMoments>, which is friendlier on the cache when the same group index is revisited.

Redundant null_on_divide_by_zero on Correlation

CorrelationGroupsAccumulator stores null_on_divide_by_zero on the struct and also passes it into all three children. The children's copy looks unreachable because the top-level evaluate short-circuits at count <= 1 before delegating. Is there a reason to keep both, or could we drop one side?

DataFusion accumulate helpers

Have you looked at datafusion::functions-aggregate-common::accumulate and accumulate_multiple? DF's own VarianceGroupsAccumulator::update_batch uses them to handle opt_filter + null skip in a single cache-friendly pass. I think the Spark f64 count requirement is what blocks direct reuse of DF's accumulator, but the update loop itself might still be a nice win here. Curious whether you considered this and found it not worth it.

Stddev test coverage

Variance has sample_single_row_nan_legacy and sample_single_row_null_when_flag_set, and correlation has the equivalent pair. Stddev grouped tests only cover pop-single-group and empty-group. Stddev wraps variance so the behavior is covered indirectly, but could we mirror those two tests so the Spark contract is explicit in the stddev file too?

CometBenchmarkBase shuffle manager

The spark.shuffle.manager = CometShuffleManager line added to CometBenchmarkBase caught my eye. The aggregate benchmarks in this PR run local[1] and don't cross a shuffle boundary, and the change applies to every benchmark that extends the base trait. Does it belong in its own PR, or moved into the specific benchmark that needed it? I want to make sure the 1.9x-3.7x numbers were measured against the same harness the previous baseline used.

Overall very nice work, the unit tests do a good job of pinning down the Spark-specific edges and the state ordering comments on correlation are helpful for future readers.

Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my comments above count as changes requested. Thanks @andygrove!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve performance of corr and covar Improve performance of aggregate expressions by implementing group accumulators

3 participants