Skip to content

[AURON #2358] Support native last / last(ignoreNulls) aggregate#2359

Open
zhuxiangyi wants to merge 1 commit into
apache:masterfrom
zhuxiangyi:support-native-last-aggregate
Open

[AURON #2358] Support native last / last(ignoreNulls) aggregate#2359
zhuxiangyi wants to merge 1 commit into
apache:masterfrom
zhuxiangyi:support-native-last-aggregate

Conversation

@zhuxiangyi

@zhuxiangyi zhuxiangyi commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2358

Rationale for this change

Auron currently accelerates first / first(ignoreNulls) natively, but last / last(ignoreNulls) fall back to the generic UDAF path (a JNI call back into the JVM), losing vectorized acceleration. This PR adds native last support.

What changes are included in this PR?

  • native (datafusion-ext-plans): add AggLast / AggLastIgnoresNull (agg/last.rs, agg/last_ignores_null.rs), mirroring the first columnar accumulators with "later value wins" semantics. Wire them through the AggFunction enum, create_agg, the protobuf contract (LAST / LAST_IGNORES_NULL), the protobuf::AggFunction -> AggFunction conversion, and the window-aggregate mapping.
  • spark-extension: add the Last expression conversion in NativeConverters.convertAggregateExpr; declare the Last native aggregate buffer schema in NativeAggBase.computeNativeAggBufferDataTypes ([dataType] for ignoreNulls, [dataType, Boolean] otherwise) so the partial -> shuffle -> final buffer schema matches the native side.

Are there any user-facing changes?

Yes. last(col) and last(col, ignoreNulls = true) are now executed natively (vectorized); previously they fell back to the UDAF path.

How was this patch tested?

  • Rust unit test agg_exec::test::test_agg_last: partial -> final two-phase aggregation over a nullable column, verifying last (keeps the last visited row including null) and last(ignoreNulls) (keeps the last non-null value).
  • Scala end-to-end test in AuronDataFrameAggregateSuite ("native last / last(ignoreNulls) aggregate", spark34 + spark35): a grouped aggregate exercising the full partial -> shuffle -> final native path, asserting correct values and that the plan offloads to NativeAggBase.

Implement native last / last(ignoreNulls) aggregates, mirroring the
existing first implementation with "later value wins" semantics:

- native: add AggLast / AggLastIgnoresNull (agg/last.rs,
  agg/last_ignores_null.rs); wire through the AggFunction enum,
  create_agg, the protobuf contract (LAST / LAST_IGNORES_NULL),
  the protobuf->AggFunction conversion, and the window-agg mapping.
- spark-extension: add the Last expression conversion in
  NativeConverters; declare the Last native aggregate buffer schema in
  NativeAggBase.computeNativeAggBufferDataTypes ([dataType] for
  ignoreNulls, [dataType, Boolean] otherwise) so the partial -> shuffle
  -> final buffer schema matches the native side.

Tests:
- Rust unit test agg_exec::test::test_agg_last (partial -> final, nulls).
- Scala e2e AuronDataFrameAggregateSuite "native last / last(ignoreNulls)
  aggregate" (spark34 + spark35), covering the partial -> shuffle ->
  final native path and asserting NativeAggBase offload.
@zhuxiangyi zhuxiangyi force-pushed the support-native-last-aggregate branch from 605140f to bcb152a Compare June 24, 2026 14:55
@slfan1989 slfan1989 requested a review from Copilot June 26, 2026 01:33
@slfan1989 slfan1989 self-assigned this Jun 26, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds native (vectorized) execution support for Spark’s last / last(ignoreNulls) aggregates by implementing corresponding native accumulators in datafusion-ext-plans and wiring the end-to-end Spark ↔ protobuf ↔ planner ↔ native execution path, with coverage in both Rust unit tests and Spark (Scala) integration tests.

Changes:

  • Implement native AggLast and AggLastIgnoresNull columnar accumulators (including two-phase partial→final merge behavior) and register them in the native agg factory.
  • Extend the protobuf contract and planner mappings to recognize LAST / LAST_IGNORES_NULL, including window aggregate mapping.
  • Add Spark-side aggregate conversion and native buffer schema declarations, plus Spark 3.4/3.5 end-to-end tests validating correctness and offload.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.

Show a summary per file
File Description
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala Declares native agg buffer schema for Last to match native accumulator layouts.
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala Converts Spark Last aggregate into native protobuf LAST / LAST_IGNORES_NULL.
native-engine/datafusion-ext-plans/src/agg/mod.rs Registers new last modules and extends AggFunction enum with Last variants.
native-engine/datafusion-ext-plans/src/agg/last.rs Implements native last(col) accumulator with value + “visited” flag semantics.
native-engine/datafusion-ext-plans/src/agg/last_ignores_null.rs Implements native last(col, ignoreNulls=true) accumulator with overwrite-on-non-null semantics.
native-engine/datafusion-ext-plans/src/agg/agg.rs Wires AggFunction::{Last,LastIgnoresNull} into create_agg.
native-engine/datafusion-ext-plans/src/agg_exec.rs Adds Rust unit test validating two-phase aggregation semantics for both last modes.
native-engine/auron-planner/src/planner.rs Adds window-function mapping for protobuf Last variants to native AggFunction.
native-engine/auron-planner/src/lib.rs Adds protobuf→native AggFunction conversions for Last variants.
native-engine/auron-planner/proto/auron.proto Extends AggFunction protobuf enum with LAST / LAST_IGNORES_NULL.
auron-spark-tests/spark35/src/test/scala/org/apache/spark/sql/AuronDataFrameAggregateSuite.scala Adds Spark 3.5 E2E test asserting correctness and native offload for last.
auron-spark-tests/spark34/src/test/scala/org/apache/spark/sql/AuronDataFrameAggregateSuite.scala Adds Spark 3.4 E2E test asserting correctness and native offload for last.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support native last / last(ignoreNulls) aggregate

3 participants