[AURON #2352] Lock spills before new spiller#2350
Conversation
| let mut spills = self.spills.lock().await; | ||
| let spill = tokio::task::spawn_blocking(move || { | ||
| let mut spill = try_new_spill(&spill_metrics)?; | ||
| let offsets = data.write(spill.get_buf_writer())?; | ||
| Ok::<_, DataFusionError>(Offsetted::new(offsets, spill)) | ||
| }) | ||
| .await | ||
| .expect("tokio spawn_blocking error")?; |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| async fn spill(&self) -> Result<()> { | ||
| let data = self.data.lock().await.drain(); | ||
| let spill_metrics = self.exec_ctx.spill_metrics().clone(); | ||
| let mut spills = self.spills.lock().await; |
There was a problem hiding this comment.
In this file the on-heap spill_id comes from newSpill() (auron-memmgr/src/spill.rs:182) and try_new_spill takes no index, so spills.len() doesn't feed a spill id here the way it does in agg_table.rs (try_into_spill(spill, spill_idx)). Is the race from the issue actually reachable in sort_repartitioner, or is this more defensive? Either's fine — just want to make sure I'm reading it right.
There was a problem hiding this comment.
I found this issue while reviewing the code, and didn't encounter any actual failures. However, this race condition is possible; although sort_repartitioner doesn't take spill_id, it still affects the spill index on the JVM side. If I understand correctly, we should always ensure that the native-side spills length is consistent with the JVM-side length, so we need to lock the spills first.
Which issue does this PR close?
Closes #2352
Rationale for this change
try_new_spillalways usesspills.lenas thespill_id, so we should always lockspillsbeforetrying_new_spill.What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?