Skip to content

[AURON #2352] Lock spills before new spiller#2350

Open
wForget wants to merge 2 commits into
apache:masterfrom
wForget:dev
Open

[AURON #2352] Lock spills before new spiller#2350
wForget wants to merge 2 commits into
apache:masterfrom
wForget:dev

Conversation

@wForget

@wForget wForget commented Jun 22, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #2352

Rationale for this change

try_new_spill always uses spills.len as the spill_id, so we should always lock spills before trying_new_spill.

What changes are included in this PR?

Are there any user-facing changes?

How was this patch tested?

@wForget wForget marked this pull request as ready for review June 22, 2026 13:00
@wForget wForget changed the title fix: lock spills before new spiller [AURON #2352] Lock spills before new spiller Jun 23, 2026
@wForget wForget requested a review from richox June 23, 2026 02:59
@cxzl25 cxzl25 requested a review from Copilot June 23, 2026 05:54

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.

Comment on lines +101 to 108
let mut spills = self.spills.lock().await;
let spill = tokio::task::spawn_blocking(move || {
let mut spill = try_new_spill(&spill_metrics)?;
let offsets = data.write(spill.get_buf_writer())?;
Ok::<_, DataFusionError>(Offsetted::new(offsets, spill))
})
.await
.expect("tokio spawn_blocking error")?;
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
async fn spill(&self) -> Result<()> {
let data = self.data.lock().await.drain();
let spill_metrics = self.exec_ctx.spill_metrics().clone();
let mut spills = self.spills.lock().await;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file the on-heap spill_id comes from newSpill() (auron-memmgr/src/spill.rs:182) and try_new_spill takes no index, so spills.len() doesn't feed a spill id here the way it does in agg_table.rs (try_into_spill(spill, spill_idx)). Is the race from the issue actually reachable in sort_repartitioner, or is this more defensive? Either's fine — just want to make sure I'm reading it right.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this issue while reviewing the code, and didn't encounter any actual failures. However, this race condition is possible; although sort_repartitioner doesn't take spill_id, it still affects the spill index on the JVM side. If I understand correctly, we should always ensure that the native-side spills length is consistent with the JVM-side length, so we need to lock the spills first.

val spill = OnHeapSpill(this, spills.length)
spills.append(Some(spill))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: lock spills before new spiller in sort_repartitioner

3 participants