From 45587d934188b51dffbb36be08189b7593550782 Mon Sep 17 00:00:00 2001 From: yew1eb Date: Sat, 6 Dec 2025 20:22:33 +0800 Subject: [PATCH 1/4] [AURON1739] Support limit with offset --------- Co-authored-by: cxzl25 <3898450+cxzl25@users.noreply.github.com> --- native-engine/auron-serde/proto/auron.proto | 6 +- native-engine/auron-serde/src/from_proto.rs | 28 ++++-- .../datafusion-ext-plans/src/limit_exec.rs | 99 ++++++++++++++++--- .../apache/spark/sql/auron/ShimsImpl.scala | 46 ++++++--- .../auron/plan/NativeCollectLimitExec.scala | 4 +- .../auron/plan/NativeGlobalLimitExec.scala | 4 +- .../auron/plan/NativeLocalLimitExec.scala | 2 +- .../plan/NativePartialTakeOrderedExec.scala | 2 +- .../auron/plan/NativeTakeOrderedExec.scala | 5 +- .../apache/auron/exec/AuronExecSuite.scala | 95 +++++++++++++++++- .../spark/sql/auron/AuronConverters.scala | 12 ++- .../org/apache/spark/sql/auron/Shims.scala | 26 +++-- .../auron/plan/NativeCollectLimitBase.scala | 6 +- .../auron/plan/NativeGlobalLimitBase.scala | 3 +- .../auron/plan/NativeLocalLimitBase.scala | 2 +- .../auron/plan/NativeTakeOrderedBase.scala | 59 +++++------ 16 files changed, 310 insertions(+), 89 deletions(-) diff --git a/native-engine/auron-serde/proto/auron.proto b/native-engine/auron-serde/proto/auron.proto index 29e9f1134..30df0d22b 100644 --- a/native-engine/auron-serde/proto/auron.proto +++ b/native-engine/auron-serde/proto/auron.proto @@ -631,7 +631,8 @@ message SortExecNode { message FetchLimit { // wrap into a message to make it optional - uint64 limit = 1; + uint32 limit = 1; + uint32 offset = 2; } message PhysicalRepartition { @@ -705,7 +706,8 @@ enum AggMode { message LimitExecNode { PhysicalPlanNode input = 1; - uint64 limit = 2; + uint32 limit = 2; + uint32 offset = 3; } message FFIReaderExecNode { diff --git a/native-engine/auron-serde/src/from_proto.rs b/native-engine/auron-serde/src/from_proto.rs index 0caaad6ca..1188a89f2 100644 --- a/native-engine/auron-serde/src/from_proto.rs +++ b/native-engine/auron-serde/src/from_proto.rs @@ -315,12 +315,18 @@ impl TryInto> for &protobuf::PhysicalPlanNode { panic!("Failed to parse physical sort expressions: {}", e); }); + let fetch = sort.fetch_limit.as_ref(); + let limit_for_sort = fetch.map(|f| f.limit as usize); + let offset = fetch.map(|f| f.offset as usize).unwrap_or(0); + let mut plan: Arc = + Arc::new(SortExec::new(input, exprs, limit_for_sort)); + + if offset > 0 { + plan = Arc::new(LimitExec::new(plan, usize::MAX, offset)); + } + // always preserve partitioning - Ok(Arc::new(SortExec::new( - input, - exprs, - sort.fetch_limit.as_ref().map(|limit| limit.limit as usize), - ))) + Ok(plan) } PhysicalPlanType::BroadcastJoinBuildHashMap(bhm) => { let input: Arc = convert_box_required!(bhm.input)?; @@ -501,7 +507,11 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::Limit(limit) => { let input: Arc = convert_box_required!(limit.input)?; - Ok(Arc::new(LimitExec::new(input, limit.limit))) + Ok(Arc::new(LimitExec::new( + input, + limit.limit as usize, + limit.offset as usize, + ))) } PhysicalPlanType::FfiReader(ffi_reader) => { let schema = Arc::new(convert_required!(ffi_reader.schema)?); @@ -513,7 +523,11 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::CoalesceBatches(coalesce_batches) => { let input: Arc = convert_box_required!(coalesce_batches.input)?; - Ok(Arc::new(LimitExec::new(input, coalesce_batches.batch_size))) + Ok(Arc::new(LimitExec::new( + input, + coalesce_batches.batch_size as usize, + 0, + ))) } PhysicalPlanType::Expand(expand) => { let schema = Arc::new(convert_required!(expand.schema)?); diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs index 1b9f5892e..f975ffacc 100644 --- a/native-engine/datafusion-ext-plans/src/limit_exec.rs +++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs @@ -41,16 +41,18 @@ use crate::common::execution_context::ExecutionContext; #[derive(Debug)] pub struct LimitExec { input: Arc, - limit: u64, + limit: usize, + skip: usize, pub metrics: ExecutionPlanMetricsSet, props: OnceCell, } impl LimitExec { - pub fn new(input: Arc, limit: u64) -> Self { + pub fn new(input: Arc, limit: usize, skip: usize) -> Self { Self { input, limit, + skip, metrics: ExecutionPlanMetricsSet::new(), props: OnceCell::new(), } @@ -59,7 +61,7 @@ impl LimitExec { impl DisplayAs for LimitExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "LimitExec(limit={})", self.limit) + write!(f, "LimitExec(limit={},skip={})", self.limit, self.skip) } } @@ -95,7 +97,11 @@ impl ExecutionPlan for LimitExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(Self::new(children[0].clone(), self.limit))) + Ok(Arc::new(Self::new( + children[0].clone(), + self.limit, + self.skip, + ))) } fn execute( @@ -105,15 +111,19 @@ impl ExecutionPlan for LimitExec { ) -> Result { let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); let input = exec_ctx.execute_with_input_stats(&self.input)?; - execute_limit(input, self.limit, exec_ctx) + if self.skip == 0 { + execute_limit(input, self.limit, exec_ctx) + } else { + execute_limit_with_skip(input, self.limit, self.skip, exec_ctx) + } } fn statistics(&self) -> Result { Statistics::with_fetch( self.input.statistics()?, self.schema(), - Some(self.limit as usize), - 0, + Some(self.limit), + self.skip, 1, ) } @@ -121,7 +131,7 @@ impl ExecutionPlan for LimitExec { fn execute_limit( mut input: SendableRecordBatchStream, - limit: u64, + limit: usize, exec_ctx: Arc, ) -> Result { Ok(exec_ctx @@ -131,11 +141,49 @@ fn execute_limit( while remaining > 0 && let Some(mut batch) = input.next().await.transpose()? { - if remaining < batch.num_rows() as u64 { - batch = batch.slice(0, remaining as usize); + if remaining < batch.num_rows() { + batch = batch.slice(0, remaining); + remaining = 0; + } else { + remaining -= batch.num_rows(); + } + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + sender.send(batch).await; + } + Ok(()) + })) +} + +fn execute_limit_with_skip( + mut input: SendableRecordBatchStream, + limit: usize, + offset: usize, + exec_ctx: Arc, +) -> Result { + Ok(exec_ctx + .clone() + .output_with_sender("Limit", move |sender| async move { + let mut skip = offset; + let mut remaining = limit - skip; + while remaining > 0 + && let Some(mut batch) = input.next().await.transpose()? + { + if skip > 0 { + let rows = batch.num_rows(); + if skip >= rows { + skip -= rows; + continue; + } + + batch = batch.slice(skip, rows - skip); + skip = 0; + } + + if remaining < batch.num_rows() { + batch = batch.slice(0, remaining); remaining = 0; } else { - remaining -= batch.num_rows() as u64; + remaining -= batch.num_rows(); } exec_ctx.baseline_metrics().record_output(batch.num_rows()); sender.send(batch).await; @@ -203,7 +251,7 @@ mod test { ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]), ); - let limit_exec = LimitExec::new(input, 2_u64); + let limit_exec = LimitExec::new(input, 2, 0); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let output = limit_exec.execute(0, task_ctx).unwrap(); @@ -222,4 +270,31 @@ mod test { assert_eq!(row_count, Precision::Exact(2)); Ok(()) } + + #[tokio::test] + async fn test_limit_with_skip() -> Result<()> { + let input = build_table( + ("a", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ("b", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]), + ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]), + ); + let limit_exec = LimitExec::new(input, 7, 5); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let output = limit_exec.execute(0, task_ctx).unwrap(); + let batches = common::collect(output).await?; + let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + + let expected = vec![ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 5 | 4 | 0 |", + "| 6 | 3 | 1 |", + "+---+---+---+", + ]; + assert_batches_eq!(expected, &batches); + assert_eq!(row_count, 2); + Ok(()) + } } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 3acbbed92..4894fbf7c 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -50,14 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.First import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.CoalescedPartitionSpec -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.PartialMapperPartitionSpec -import org.apache.spark.sql.execution.PartialReducerPartitionSpec -import org.apache.spark.sql.execution.ShuffledRowRDD -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.UnaryExecNode +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.auron.plan._ import org.apache.spark.sql.execution.auron.plan.ConvertToNativeExec @@ -288,16 +281,38 @@ class ShimsImpl extends Shims with Logging { child: SparkPlan): NativeGenerateBase = NativeGenerateExec(generator, requiredChildOutput, outer, generatorOutput, child) - override def createNativeGlobalLimitExec(limit: Long, child: SparkPlan): NativeGlobalLimitBase = - NativeGlobalLimitExec(limit, child) + private def effectiveLimit(rawLimit: Int): Int = + if (rawLimit == -1) Int.MaxValue else rawLimit - override def createNativeLocalLimitExec(limit: Long, child: SparkPlan): NativeLocalLimitBase = + @sparkver("3.4 / 3.5") + override def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = { + (effectiveLimit(plan.limit), plan.offset) + } + + @sparkver("3.4 / 3.5") + override def getLimitAndOffset(plan: TakeOrderedAndProjectExec): (Int, Int) = { + (effectiveLimit(plan.limit), plan.offset) + } + + override def createNativeGlobalLimitExec( + limit: Int, + offset: Int, + child: SparkPlan): NativeGlobalLimitBase = + NativeGlobalLimitExec(limit, offset, child) + + override def createNativeLocalLimitExec(limit: Int, child: SparkPlan): NativeLocalLimitBase = NativeLocalLimitExec(limit, child) + @sparkver("3.4 / 3.5") + override def getLimitAndOffset(plan: CollectLimitExec): (Int, Int) = { + (effectiveLimit(plan.limit), plan.offset) + } + override def createNativeCollectLimitExec( limit: Int, + offset: Int, child: SparkPlan): NativeCollectLimitBase = - NativeCollectLimitExec(limit, child) + NativeCollectLimitExec(limit, offset, child) override def createNativeParquetInsertIntoHiveTableExec( cmd: InsertIntoHiveTable, @@ -334,13 +349,14 @@ class ShimsImpl extends Shims with Logging { NativeSortExec(sortOrder, global, child) override def createNativeTakeOrderedExec( - limit: Long, + limit: Int, + offset: Int, sortOrder: Seq[SortOrder], child: SparkPlan): NativeTakeOrderedBase = - NativeTakeOrderedExec(limit, sortOrder, child) + NativeTakeOrderedExec(limit, offset, sortOrder, child) override def createNativePartialTakeOrderedExec( - limit: Long, + limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan, metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase = diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala index ba514ab7d..4ff7d8049 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala @@ -20,8 +20,8 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver -case class NativeCollectLimitExec(limit: Int, override val child: SparkPlan) - extends NativeCollectLimitBase(limit, child) { +case class NativeCollectLimitExec(limit: Int, offset: Int, override val child: SparkPlan) + extends NativeCollectLimitBase(limit, offset, child) { @sparkver("3.2 / 3.3 / 3.4 / 3.5") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala index 8aba2de4c..1b493f432 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala @@ -20,8 +20,8 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver -case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan) - extends NativeGlobalLimitBase(limit, child) { +case class NativeGlobalLimitExec(limit: Int, offset: Int, override val child: SparkPlan) + extends NativeGlobalLimitBase(limit, offset, child) { @sparkver("3.2 / 3.3 / 3.4 / 3.5") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala index ad74b02ec..805408c8d 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver -case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan) +case class NativeLocalLimitExec(limit: Int, override val child: SparkPlan) extends NativeLocalLimitBase(limit, child) { @sparkver("3.2 / 3.3 / 3.4 / 3.5") diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala index e243e6f37..faf541a66 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.auron.sparkver case class NativePartialTakeOrderedExec( - limit: Long, + limit: Int, sortOrder: Seq[SortOrder], override val child: SparkPlan, override val metrics: Map[String, SQLMetric]) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala index cec298b64..165481558 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala @@ -22,10 +22,11 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver case class NativeTakeOrderedExec( - limit: Long, + limit: Int, + offset: Int, sortOrder: Seq[SortOrder], override val child: SparkPlan) - extends NativeTakeOrderedBase(limit, sortOrder, child) { + extends NativeTakeOrderedBase(limit, offset, sortOrder, child) { @sparkver("3.2 / 3.3 / 3.4 / 3.5") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala index d7adf3a74..7f62dd521 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala @@ -17,13 +17,14 @@ package org.apache.auron.exec import org.apache.spark.sql.AuronQueryTest -import org.apache.spark.sql.execution.auron.plan.NativeCollectLimitExec +import org.apache.spark.sql.execution.auron.plan.{NativeCollectLimitExec, NativeGlobalLimitExec, NativeLocalLimitExec, NativeTakeOrderedExec} import org.apache.auron.BaseAuronSQLSuite +import org.apache.auron.util.AuronTestUtils class AuronExecSuite extends AuronQueryTest with BaseAuronSQLSuite { - test("Collect Limit") { + test("CollectLimit") { withTable("t1") { sql("create table t1(id INT) using parquet") sql("insert into t1 values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)") @@ -36,4 +37,94 @@ class AuronExecSuite extends AuronQueryTest with BaseAuronSQLSuite { } } + test("CollectLimit with offset") { + if (AuronTestUtils.isSparkV34OrGreater) { + withTempView("t1") { + sql("create table if not exists t1(id INT) using parquet") + sql("insert into t1 values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)") + Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach { + case (limit, offset) => { + val query = s"select * from t1 limit $limit offset $offset" + val df = checkSparkAnswerAndOperator(() => spark.sql(query)) + assert(collect(df.queryExecution.executedPlan) { case e: NativeCollectLimitExec => + e + }.size == 1) + } + } + } + } + } + + test("GlobalLimit and LocalLimit") { + withTempView("t1") { + sql("create table if not exists t1(id INT) using parquet") + sql("insert into t1 values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)") + val df = checkSparkAnswerAndOperator(() => + spark + .sql(s""" + |select id from ( + | select * from t1 limit 5 + |) where id > 0 limit 10; + |""".stripMargin) + .groupBy("id") + .count()) + assert(collect(df.queryExecution.executedPlan) { + case e: NativeGlobalLimitExec => e + case e: NativeLocalLimitExec => e + }.size >= 2) + } + } + + test("GlobalLimit with offset") { + if (AuronTestUtils.isSparkV34OrGreater) { + withTempView("t1") { + sql("create table if not exists t1(id INT) using parquet") + sql("insert into t1 values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)") + Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach { + case (limit, offset) => { + val query = s"select * from t1 limit $limit offset $offset" + val df = checkSparkAnswerAndOperator(() => spark.sql(query).groupBy("id").count()) + assert(collect(df.queryExecution.executedPlan) { case e: NativeGlobalLimitExec => + e + }.size == 1) + } + } + } + } + } + + test("TakeOrderedAndProject") { + withTempView("t1") { + sql("create table if not exists t1(id INT) using parquet") + sql("insert into t1 values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)") + val df = checkSparkAnswerAndOperator(() => + spark + .sql(s""" + | select id from t1 order by id limit 5 + |""".stripMargin) + .groupBy("id") + .count()) + assert(collect(df.queryExecution.executedPlan) { case e: NativeTakeOrderedExec => + e + }.size == 1) + } + } + + test("TakeOrderedAndProject with offset") { + if (AuronTestUtils.isSparkV34OrGreater) { + withTempView("t1") { + sql("create table if not exists t1(id INT) using parquet") + sql("insert into t1 values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)") + Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach { + case (limit, offset) => { + val query = s"select * from t1 order by id limit $limit offset $offset" + val df = checkSparkAnswerAndOperator(() => spark.sql(query).groupBy("id").count()) + assert(collect(df.queryExecution.executedPlan) { case e: NativeTakeOrderedExec => + e + }.size == 1) + } + } + } + } + } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 413ad7be5..7b550d6ff 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -766,18 +766,21 @@ object AuronConverters extends Logging { def convertLocalLimitExec(exec: LocalLimitExec): SparkPlan = { logDebugPlanConversion(exec) - Shims.get.createNativeLocalLimitExec(exec.limit.toLong, exec.child) + Shims.get.createNativeLocalLimitExec(exec.limit, exec.child) } def convertGlobalLimitExec(exec: GlobalLimitExec): SparkPlan = { logDebugPlanConversion(exec) - Shims.get.createNativeGlobalLimitExec(exec.limit.toLong, exec.child) + val (limit, offset) = Shims.get.getLimitAndOffset(exec) + Shims.get.createNativeGlobalLimitExec(limit, offset, exec.child) } def convertTakeOrderedAndProjectExec(exec: TakeOrderedAndProjectExec): SparkPlan = { logDebugPlanConversion(exec) + val (limit, offset) = Shims.get.getLimitAndOffset(exec) val nativeTakeOrdered = Shims.get.createNativeTakeOrderedExec( - exec.limit, + limit, + offset, exec.sortOrder, addRenameColumnsExec(convertToNative(exec.child))) @@ -791,7 +794,8 @@ object AuronConverters extends Logging { def convertCollectLimitExec(exec: CollectLimitExec): SparkPlan = { logDebugPlanConversion(exec) - Shims.get.createNativeCollectLimitExec(exec.limit, exec.child) + val (limit, offset) = Shims.get.getLimitAndOffset(exec) + Shims.get.createNativeCollectLimitExec(limit, offset, exec.child) } def convertHashAggregateExec(exec: HashAggregateExec): SparkPlan = { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index a192e1982..58c08c4d8 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -39,8 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec, GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.auron.plan._ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase @@ -119,11 +118,21 @@ abstract class Shims { generatorOutput: Seq[Attribute], child: SparkPlan): NativeGenerateBase - def createNativeGlobalLimitExec(limit: Long, child: SparkPlan): NativeGlobalLimitBase + def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = (plan.limit, 0) - def createNativeLocalLimitExec(limit: Long, child: SparkPlan): NativeLocalLimitBase + def createNativeGlobalLimitExec( + limit: Int, + offset: Int, + child: SparkPlan): NativeGlobalLimitBase - def createNativeCollectLimitExec(limit: Int, child: SparkPlan): NativeCollectLimitBase + def createNativeLocalLimitExec(limit: Int, child: SparkPlan): NativeLocalLimitBase + + def getLimitAndOffset(plan: CollectLimitExec): (Int, Int) = (plan.limit, 0) + + def createNativeCollectLimitExec( + limit: Int, + offset: Int, + child: SparkPlan): NativeCollectLimitBase def createNativeParquetInsertIntoHiveTableExec( cmd: InsertIntoHiveTable, @@ -151,13 +160,16 @@ abstract class Shims { global: Boolean, child: SparkPlan): NativeSortBase + def getLimitAndOffset(plan: TakeOrderedAndProjectExec): (Int, Int) = (plan.limit, 0) + def createNativeTakeOrderedExec( - limit: Long, + limit: Int, + offset: Int, sortOrder: Seq[SortOrder], child: SparkPlan): NativeTakeOrderedBase def createNativePartialTakeOrderedExec( - limit: Long, + limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan, metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala index f8315ed11..d33661030 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.{LimitExecNode, PhysicalPlanNode} -abstract class NativeCollectLimitBase(limit: Int, override val child: SparkPlan) +abstract class NativeCollectLimitBase(limit: Int, offset: Int, override val child: SparkPlan) extends UnaryExecNode with NativeSupports { override def output: Seq[Attribute] = child.output @@ -51,7 +51,8 @@ abstract class NativeCollectLimitBase(limit: Int, override val child: SparkPlan) val row = it.next().copy() buf += row } - buf.toArray + val rows = buf.toArray + if (offset > 0) rows.drop(offset) else rows } override def doExecuteNative(): NativeRDD = { @@ -78,6 +79,7 @@ abstract class NativeCollectLimitBase(limit: Int, override val child: SparkPlan) .newBuilder() .setInput(singlePartitionRDD.nativePlan(inputPartition, taskContext)) .setLimit(limit) + .setOffset(offset) .build() PhysicalPlanNode.newBuilder().setLimit(nativeLimitExec).build() }, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala index 83c1f7d85..e8c54d47e 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala @@ -35,7 +35,7 @@ import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.LimitExecNode import org.apache.auron.protobuf.PhysicalPlanNode -abstract class NativeGlobalLimitBase(limit: Long, override val child: SparkPlan) +abstract class NativeGlobalLimitBase(limit: Int, offset: Int, override val child: SparkPlan) extends UnaryExecNode with NativeSupports { @@ -67,6 +67,7 @@ abstract class NativeGlobalLimitBase(limit: Long, override val child: SparkPlan) .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) .setLimit(limit) + .setOffset(offset) .build() PhysicalPlanNode.newBuilder().setLimit(nativeLimitExec).build() }, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala index f0ba9e3bd..101867283 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala @@ -33,7 +33,7 @@ import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.LimitExecNode import org.apache.auron.protobuf.PhysicalPlanNode -abstract class NativeLocalLimitBase(limit: Long, override val child: SparkPlan) +abstract class NativeLocalLimitBase(limit: Int, override val child: SparkPlan) extends UnaryExecNode with NativeSupports { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala index b3a5b7fe3..dd7ed2f0b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala @@ -47,7 +47,8 @@ import org.apache.auron.protobuf.PhysicalSortExprNode import org.apache.auron.protobuf.SortExecNode abstract class NativeTakeOrderedBase( - limit: Long, + limit: Int, + offset: Int, sortOrder: Seq[SortOrder], override val child: SparkPlan) extends UnaryExecNode @@ -81,35 +82,37 @@ abstract class NativeTakeOrderedBase( val ord = new LazilyGeneratedOrdering(sortOrder, output) // all partitions are sorted, so perform a sorted-merge to achieve the result - partial - .execute() - .map(_.copy()) - .mapPartitions(iter => Iterator.single(iter.toArray)) - .reduce { case (array1, array2) => - val result = ArrayBuffer[InternalRow]() - var i = 0 - var j = 0 - - while (result.length < limit && (i < array1.length || j < array2.length)) { - 0 match { - case _ if i == array1.length => - result.append(array2(j)) - j += 1 - case _ if j == array2.length => - result.append(array1(i)) - i += 1 - case _ => - if (ord.compare(array1(i), array2(j)) <= 0) { - result.append(array1(i)) - i += 1 - } else { + val rows = + partial + .execute() + .map(_.copy()) + .mapPartitions(iter => Iterator.single(iter.toArray)) + .reduce { case (array1, array2) => + val result = ArrayBuffer[InternalRow]() + var i = 0 + var j = 0 + + while (result.length < limit && (i < array1.length || j < array2.length)) { + 0 match { + case _ if i == array1.length => result.append(array2(j)) j += 1 - } + case _ if j == array2.length => + result.append(array1(i)) + i += 1 + case _ => + if (ord.compare(array1(i), array2(j)) <= 0) { + result.append(array1(i)) + i += 1 + } else { + result.append(array2(j)) + j += 1 + } + } } + result.toArray } - result.toArray - } + if (offset > 0) rows.drop(offset) else rows } // check whether native converting is supported @@ -141,7 +144,7 @@ abstract class NativeTakeOrderedBase( .newBuilder() .setInput(shuffledRDD.nativePlan(inputPartition, taskContext)) .addAllExpr(nativeSortExprs.asJava) - .setFetchLimit(FetchLimit.newBuilder().setLimit(limit)) + .setFetchLimit(FetchLimit.newBuilder().setLimit(limit).setOffset(offset)) .build() PhysicalPlanNode.newBuilder().setSort(nativeTakeOrderedExec).build() }, @@ -150,7 +153,7 @@ abstract class NativeTakeOrderedBase( } abstract class NativePartialTakeOrderedBase( - limit: Long, + limit: Int, sortOrder: Seq[SortOrder], override val child: SparkPlan, override val metrics: Map[String, SQLMetric]) From 274d98c9998964603ac9e4292c0a2b7ff2cf6ecd Mon Sep 17 00:00:00 2001 From: yew1eb Date: Tue, 23 Dec 2025 16:49:25 +0800 Subject: [PATCH 2/4] up --- native-engine/auron-serde/src/from_proto.rs | 10 +-- .../datafusion-ext-plans/src/sort_exec.rs | 85 +++++++++++++++++-- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/native-engine/auron-serde/src/from_proto.rs b/native-engine/auron-serde/src/from_proto.rs index 1188a89f2..ac76f7c87 100644 --- a/native-engine/auron-serde/src/from_proto.rs +++ b/native-engine/auron-serde/src/from_proto.rs @@ -316,17 +316,11 @@ impl TryInto> for &protobuf::PhysicalPlanNode { }); let fetch = sort.fetch_limit.as_ref(); - let limit_for_sort = fetch.map(|f| f.limit as usize); let offset = fetch.map(|f| f.offset as usize).unwrap_or(0); - let mut plan: Arc = - Arc::new(SortExec::new(input, exprs, limit_for_sort)); - - if offset > 0 { - plan = Arc::new(LimitExec::new(plan, usize::MAX, offset)); - } + let limit = fetch.map(|f| f.limit as usize); // always preserve partitioning - Ok(plan) + Ok(Arc::new(SortExec::new(input, exprs, offset, limit))) } PhysicalPlanType::BroadcastJoinBuildHashMap(bhm) => { let input: Arc = convert_box_required!(bhm.input)?; diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index 24d0beb6b..74887b483 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -86,7 +86,8 @@ const NUM_MAX_MERGING_BATCHES: usize = 32; pub struct SortExec { input: Arc, exprs: Vec, - fetch: Option, + skip: usize, + limit: Option, metrics: ExecutionPlanMetricsSet, record_output: bool, props: OnceCell, @@ -96,13 +97,15 @@ impl SortExec { pub fn new( input: Arc, exprs: Vec, - fetch: Option, + skip: usize, + limit: Option, ) -> Self { let metrics = ExecutionPlanMetricsSet::new(); Self { input, exprs, - fetch, + skip, + limit, metrics, record_output: true, props: OnceCell::new(), @@ -129,6 +132,7 @@ pub fn create_default_ascending_sort_exec( options: Default::default(), }) .collect(), + 0, None, ); if let Some(execution_plan_metrics) = execution_plan_metrics { @@ -185,7 +189,8 @@ impl ExecutionPlan for SortExec { Ok(Arc::new(Self::new( children[0].clone(), self.exprs.clone(), - self.fetch, + self.skip, + self.limit, ))) } @@ -203,7 +208,13 @@ impl ExecutionPlan for SortExec { } fn statistics(&self) -> Result { - Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) + Statistics::with_fetch( + self.input.statistics()?, + self.schema(), + self.limit, + self.skip, + 1, + ) } } @@ -223,7 +234,8 @@ impl SortExec { mem_consumer_info: None, weak: Weak::new(), prune_sort_keys_from_batch: prune_sort_keys_from_batch.clone(), - limit: self.fetch.unwrap_or(usize::MAX), + skip: self.skip, + limit: self.limit.unwrap_or(usize::MAX), record_output: self.record_output, in_mem_blocks: Default::default(), spills: Default::default(), @@ -336,6 +348,7 @@ struct ExternalSorter { mem_consumer_info: Option>, weak: Weak, prune_sort_keys_from_batch: Arc, + skip: usize, limit: usize, record_output: bool, in_mem_blocks: Arc>>, @@ -704,6 +717,9 @@ impl ExternalSorter { let in_mem_blocks = std::mem::take(&mut *self.in_mem_blocks.lock()); if !in_mem_blocks.is_empty() { let mut merger = Merger::try_new(self.clone(), in_mem_blocks)?; + if self.skip > 0 { + merger.skip_rows::(self.skip, output_batch_size); + } while let Some((key_collector, pruned_batch)) = merger.next::(output_batch_size)? { @@ -727,6 +743,9 @@ impl ExternalSorter { let spill_blocks = spills.into_iter().map(|spill| spill.block).collect(); let mut merger = Merger::try_new(self.to_arc(), spill_blocks)?; + if self.skip > 0 { + merger.skip_rows::(self.skip, output_batch_size); + } while let Some((key_collector, pruned_batch)) = merger.next::(output_batch_size)? { @@ -1023,6 +1042,22 @@ impl Merger { } Ok(Some((key_collector, pruned_batch))) } + + pub fn skip_rows( + &mut self, + skip: usize, + suggested_batch_size: usize, + ) -> Result<()> { + let mut remaining = skip; + while remaining > 0 { + let batch_size = remaining.min(suggested_batch_size); + if self.next::(batch_size)?.is_none() { + break; + } + remaining -= batch_size; + } + Ok(()) + } } fn merge_blocks( @@ -1468,7 +1503,7 @@ mod test { options: SortOptions::default(), }]; - let sort = SortExec::new(input, sort_exprs, Some(6)); + let sort = SortExec::new(input, sort_exprs, 0, Some(6)); let output = sort.execute(0, task_ctx)?; let batches = common::collect(output).await?; let expected = vec![ @@ -1487,6 +1522,40 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_sort_i32_with_skip() -> Result<()> { + MemManager::init(100); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let input = build_table( + ("a", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]), + ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]), + ); + let sort_exprs = vec![PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }]; + + let sort = SortExec::new(input, sort_exprs, 3, Some(8)); + let output = sort.execute(0, task_ctx)?; + let batches = common::collect(output).await?; + let expected = vec![ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 3 | 6 | 1 |", + "| 4 | 5 | 0 |", + "| 5 | 4 | 9 |", + "| 6 | 3 | 8 |", + "| 7 | 2 | 7 |", + "+---+---+---+", + ]; + assert_batches_eq!(expected, &batches); + + Ok(()) + } } #[cfg(test)] @@ -1581,7 +1650,7 @@ mod fuzztest { schema.clone(), None, )?); - let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None)); + let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), 0, None)); let output = datafusion::physical_plan::collect(sort.clone(), task_ctx.clone()).await?; let a = concat_batches(&schema, &output)?; let a_row_count = sort.clone().statistics()?.num_rows; From 2b7185248fd7c4552a518b3e4ae600e52d309e45 Mon Sep 17 00:00:00 2001 From: yew1eb Date: Tue, 23 Dec 2025 17:16:22 +0800 Subject: [PATCH 3/4] up --- native-engine/auron-serde/src/from_proto.rs | 4 ++-- .../datafusion-ext-plans/src/limit_exec.rs | 18 ++++++++--------- .../datafusion-ext-plans/src/sort_exec.rs | 20 +++++++++---------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/native-engine/auron-serde/src/from_proto.rs b/native-engine/auron-serde/src/from_proto.rs index ac76f7c87..4b420523d 100644 --- a/native-engine/auron-serde/src/from_proto.rs +++ b/native-engine/auron-serde/src/from_proto.rs @@ -316,11 +316,11 @@ impl TryInto> for &protobuf::PhysicalPlanNode { }); let fetch = sort.fetch_limit.as_ref(); - let offset = fetch.map(|f| f.offset as usize).unwrap_or(0); let limit = fetch.map(|f| f.limit as usize); + let offset = fetch.map(|f| f.offset as usize).unwrap_or(0); // always preserve partitioning - Ok(Arc::new(SortExec::new(input, exprs, offset, limit))) + Ok(Arc::new(SortExec::new(input, exprs, limit, offset))) } PhysicalPlanType::BroadcastJoinBuildHashMap(bhm) => { let input: Arc = convert_box_required!(bhm.input)?; diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs index f975ffacc..d40e837fb 100644 --- a/native-engine/datafusion-ext-plans/src/limit_exec.rs +++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs @@ -42,17 +42,17 @@ use crate::common::execution_context::ExecutionContext; pub struct LimitExec { input: Arc, limit: usize, - skip: usize, + offset: usize, pub metrics: ExecutionPlanMetricsSet, props: OnceCell, } impl LimitExec { - pub fn new(input: Arc, limit: usize, skip: usize) -> Self { + pub fn new(input: Arc, limit: usize, offset: usize) -> Self { Self { input, limit, - skip, + offset, metrics: ExecutionPlanMetricsSet::new(), props: OnceCell::new(), } @@ -61,7 +61,7 @@ impl LimitExec { impl DisplayAs for LimitExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "LimitExec(limit={},skip={})", self.limit, self.skip) + write!(f, "LimitExec(limit={},offset={})", self.limit, self.offset) } } @@ -100,7 +100,7 @@ impl ExecutionPlan for LimitExec { Ok(Arc::new(Self::new( children[0].clone(), self.limit, - self.skip, + self.offset, ))) } @@ -111,10 +111,10 @@ impl ExecutionPlan for LimitExec { ) -> Result { let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); let input = exec_ctx.execute_with_input_stats(&self.input)?; - if self.skip == 0 { + if self.offset == 0 { execute_limit(input, self.limit, exec_ctx) } else { - execute_limit_with_skip(input, self.limit, self.skip, exec_ctx) + execute_limit_with_offset(input, self.limit, self.offset, exec_ctx) } } @@ -123,7 +123,7 @@ impl ExecutionPlan for LimitExec { self.input.statistics()?, self.schema(), Some(self.limit), - self.skip, + self.offset, 1, ) } @@ -154,7 +154,7 @@ fn execute_limit( })) } -fn execute_limit_with_skip( +fn execute_limit_with_offset( mut input: SendableRecordBatchStream, limit: usize, offset: usize, diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index 74887b483..0f8608d3d 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -86,8 +86,8 @@ const NUM_MAX_MERGING_BATCHES: usize = 32; pub struct SortExec { input: Arc, exprs: Vec, - skip: usize, limit: Option, + offset: usize, metrics: ExecutionPlanMetricsSet, record_output: bool, props: OnceCell, @@ -97,15 +97,15 @@ impl SortExec { pub fn new( input: Arc, exprs: Vec, - skip: usize, limit: Option, + offset: usize, ) -> Self { let metrics = ExecutionPlanMetricsSet::new(); Self { input, exprs, - skip, limit, + offset, metrics, record_output: true, props: OnceCell::new(), @@ -132,8 +132,8 @@ pub fn create_default_ascending_sort_exec( options: Default::default(), }) .collect(), - 0, None, + 0 ); if let Some(execution_plan_metrics) = execution_plan_metrics { sort_exec.metrics = execution_plan_metrics; @@ -189,8 +189,8 @@ impl ExecutionPlan for SortExec { Ok(Arc::new(Self::new( children[0].clone(), self.exprs.clone(), - self.skip, self.limit, + self.offset ))) } @@ -212,7 +212,7 @@ impl ExecutionPlan for SortExec { self.input.statistics()?, self.schema(), self.limit, - self.skip, + self.offset, 1, ) } @@ -234,7 +234,7 @@ impl SortExec { mem_consumer_info: None, weak: Weak::new(), prune_sort_keys_from_batch: prune_sort_keys_from_batch.clone(), - skip: self.skip, + skip: self.offset, limit: self.limit.unwrap_or(usize::MAX), record_output: self.record_output, in_mem_blocks: Default::default(), @@ -1503,7 +1503,7 @@ mod test { options: SortOptions::default(), }]; - let sort = SortExec::new(input, sort_exprs, 0, Some(6)); + let sort = SortExec::new(input, sort_exprs, Some(6), 0); let output = sort.execute(0, task_ctx)?; let batches = common::collect(output).await?; let expected = vec![ @@ -1538,7 +1538,7 @@ mod test { options: SortOptions::default(), }]; - let sort = SortExec::new(input, sort_exprs, 3, Some(8)); + let sort = SortExec::new(input, sort_exprs, Some(8), 3); let output = sort.execute(0, task_ctx)?; let batches = common::collect(output).await?; let expected = vec![ @@ -1650,7 +1650,7 @@ mod fuzztest { schema.clone(), None, )?); - let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), 0, None)); + let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None, 0)); let output = datafusion::physical_plan::collect(sort.clone(), task_ctx.clone()).await?; let a = concat_batches(&schema, &output)?; let a_row_count = sort.clone().statistics()?.num_rows; From f4479eb13d851e879c8e43f3aa44b9ab098003ad Mon Sep 17 00:00:00 2001 From: yew1eb Date: Tue, 23 Dec 2025 17:29:52 +0800 Subject: [PATCH 4/4] up --- native-engine/datafusion-ext-plans/src/limit_exec.rs | 2 +- native-engine/datafusion-ext-plans/src/sort_exec.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs index d40e837fb..245059dd2 100644 --- a/native-engine/datafusion-ext-plans/src/limit_exec.rs +++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs @@ -272,7 +272,7 @@ mod test { } #[tokio::test] - async fn test_limit_with_skip() -> Result<()> { + async fn test_limit_with_offset() -> Result<()> { let input = build_table( ("a", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), ("b", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]), diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index 0f8608d3d..67ba383c3 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -133,7 +133,7 @@ pub fn create_default_ascending_sort_exec( }) .collect(), None, - 0 + 0, ); if let Some(execution_plan_metrics) = execution_plan_metrics { sort_exec.metrics = execution_plan_metrics; @@ -190,7 +190,7 @@ impl ExecutionPlan for SortExec { children[0].clone(), self.exprs.clone(), self.limit, - self.offset + self.offset, ))) }