Skip to content

feat: native collect_list / array_agg aggregate#4720

Open
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:feat-collect-list
Open

feat: native collect_list / array_agg aggregate#4720
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:feat-collect-list

Conversation

@andygrove

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #2524.

Rationale for this change

collect_list and its alias array_agg are common aggregate functions that previously fell back to Spark, breaking native execution for many real workloads (notably plans that group rows into arrays before further processing). Adding native support keeps these queries on the Comet path.

What changes are included in this PR?

This change was scaffolded with the implement-comet-expression skill.

  • Add CollectList message to expr.proto and a new collectList = 18 arm in the AggExpr oneof.
  • Add CometCollectList serde in aggregates.scala and register classOf[CollectList] -> CometCollectList in QueryPlanSerde.
  • Wire the native side to datafusion_spark::function::aggregate::collect::SparkCollectList, the upstream Spark-compatible accumulator (Spark 3.4 through 4.1 use ignore_nulls = true semantics that match it). No Comet-local Rust function is added.
  • Extend the Partial-mode adjustOutputForNativeState fix in operators.scala to cover CollectList (same pattern already used for CollectSet: native produces ArrayType(elementType) while Spark declares the buffer as BinaryType).
  • Mark collect_list and array_agg as supported in the user-guide expression page.
  • Add an audit entry under docs/source/contributor-guide/expression-audits/agg_funcs.md covering Spark 3.4.3, 3.5.8, 4.0.1, 4.1.1.

How are these changes tested?

  • New SQL fixture spark/src/test/resources/sql-tests/expressions/aggregate/collect_list.sql exercises ~30 queries across types (boolean, byte/short/int/bigint, float/double including NaN/Inf/-0, string, binary, decimal up to (38,0), date, timestamp, struct, nested array), GROUP BY, NULLs, all-NULL groups, empty tables, single-row, mixed aggregates, multiple collect_list columns, DISTINCT, HAVING, the array_agg alias, INT/BIGINT boundary values, and an SPARK-17641 null-filter regression. The fixture runs under ConfigMatrix: parquet.enable.dictionary=false,true, so each query executes twice.
  • All 21 aggregate CometSqlFileTestSuite tests pass (./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite expressions/aggregate" -Dtest=none -Pspark-3.5), confirming no regression to collect_set or other aggregates.
  • cargo clippy --all-targets --workspace -- -D warnings is clean.

Wires Spark's CollectList aggregate to datafusion-spark's
SparkCollectList. array_agg, registered as a SQL alias of
CollectList in FunctionRegistry, is also covered.

Closes apache#2524.
@andygrove andygrove force-pushed the feat-collect-list branch from bf6b0c1 to c9c19e3 Compare June 24, 2026 16:31
@andygrove andygrove marked this pull request as ready for review June 24, 2026 17:01
@andygrove andygrove marked this pull request as draft June 24, 2026 19:26
…gates

A distinct aggregate combined with collect_list/collect_set produces a
multi-stage plan (Partial -> PartialMerge -> Final). CollectList/CollectSet
declare a BinaryType buffer in Spark but produce a native ArrayType state, so
Comet cannot read a Spark-produced Binary buffer, nor round-trip its own
ArrayType buffer across the intermediate PartialMerge stages. Both led to
native crashes ("could not cast Binary to List" / "cast List to Binary").

Force these multi-stage aggregates to fall back to Spark consistently:
- tag the feeding Partial when a PartialMerge stage of CollectList/CollectSet
  is present (CometExecRule.tagUnsafePartialAggregates), and
- fall back a PartialMerge stage whose buffer was produced by a Spark partial
  (CometBaseAggregate.doConvert).

Two-stage collect_list/collect_set continue to run natively.

Patch the upstream SPARK-22223 plan-shape test to disable Comet, since native
collect_list removes the ObjectHashAggregateExec it asserts on.

Enabling fully-native multi-stage execution is tracked in apache#4724.
@andygrove andygrove marked this pull request as ready for review June 25, 2026 13:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate collect_list/array_agg to Comet

1 participant