From 9a99646254610ff79d5e98707625b0e8fda020d9 Mon Sep 17 00:00:00 2001 From: Sabin Floares Date: Thu, 18 Jun 2026 12:59:31 +0300 Subject: [PATCH] perf(fts): Eliminate SortExec from FTS queries --- rust/lance/src/io/exec/fts.rs | 14 +++++++++++--- rust/lance/src/io/exec/take.rs | 25 +++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 240fc70bebc..6c0c910ac71 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -19,7 +19,8 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning, PhysicalSortExpr}; +use arrow_schema::SortOptions; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::future::try_join_all; @@ -189,6 +190,7 @@ impl MetricsCollector for FtsIndexMetrics { } } + #[derive(Debug)] pub struct MatchQueryExec { dataset: Arc, @@ -246,7 +248,10 @@ impl MatchQueryExec { prefilter_source: PreFilterSource, ) -> Self { let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), + EquivalenceProperties::new_with_orderings(FTS_SCHEMA.clone(), [[PhysicalSortExpr::new( + Arc::new(Column::new(SCORE_COL, 1)), + SortOptions { descending: true, nulls_first: true }, + )]]), Partitioning::RoundRobinBatch(1), EmissionType::Final, Boundedness::Bounded, @@ -281,7 +286,10 @@ impl MatchQueryExec { segments: Vec, ) -> Self { let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), + EquivalenceProperties::new_with_orderings(FTS_SCHEMA.clone(), [[PhysicalSortExpr::new( + Arc::new(Column::new(SCORE_COL, 1)), + SortOptions { descending: true, nulls_first: true }, + )]]), Partitioning::RoundRobinBatch(1), EmissionType::Final, Boundedness::Bounded, diff --git a/rust/lance/src/io/exec/take.rs b/rust/lance/src/io/exec/take.rs index 977a9c88dce..2943b63c5cc 100644 --- a/rust/lance/src/io/exec/take.rs +++ b/rust/lance/src/io/exec/take.rs @@ -20,7 +20,9 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, }; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::projection::ProjectionMapping; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use futures::FutureExt; use futures::stream::{FuturesOrdered, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; @@ -477,12 +479,31 @@ impl TakeExec { &projection, )); let output_arrow = Arc::new(ArrowSchema::from(output_schema.as_ref())); + // Propagate input ordering through the schema change. TakeExec always + // places input fields first in the same order, so input field at index i + // maps to output field at index i. New dataset fields appended at the end + // have no ordering and are simply not included in the mapping. + let input_schema_ref = input.schema(); + let mapping_exprs = input_schema_ref.fields().iter().enumerate().map(|(i, f)| { + ( + Arc::new(Column::new(f.name(), i)) as Arc, + f.name().to_string(), + ) + }); + let eq_props = ProjectionMapping::try_new(mapping_exprs, &input_schema_ref) + .map(|m| { + input + .properties() + .equivalence_properties() + .project(&m, output_arrow.clone()) + }) + .unwrap_or_else(|_| EquivalenceProperties::new(output_arrow.clone())); let properties = Arc::new( input .properties() .as_ref() .clone() - .with_eq_properties(EquivalenceProperties::new(output_arrow.clone())), + .with_eq_properties(eq_props), ); Ok(Some(Self {